zoukankan      html  css  js  c++  java
  • [elk]logstash的最佳实战-项目实战

    重点参考:
    http://blog.csdn.net/qq1032355091/article/details/52953837
    不得不说这是一个伟大的项目实战,是正式踏入logstash门槛的捷径

    Logstash的使用

    logstash支持把配置写入文件 xxx.conf,然后通过读取配置文件来采集数据
    ./bin/logstash –f xxx.conf

    logstash最终会把数据封装成json类型,默认会添加@timestamp时间字段、host主机字段、type字段。原消息数据会整个封装进message字段。如果数据处理过程中,用户解析添加了多个字段,则最终结果又会多出.。

    Logstash的结构

    Logstash由 input,filter,output三个组件去完成采集数据
    如下是一个logstash的配置实例:

    input {  
        file {  
            type => "log"  
            path => "/log/*/*.log"  
            discover_interval => 10  
            start_position => "beginning"   
        }  
    }  
    filter {  
    }   
    output {  
        elasticsearch {  
        index => "log-%{+YYYY.MM.dd}"  
        hosts => ["172.16.0.14:9200", "172.16.0.15:9200", "172.16.0.16:9200"]  
        }  
        stdout {codec => rubydebug}  
    }  
    

    input

    input组件负责读取数据,可以采用file插件读取本地文本文件,stdin插件读取标准输入数据,tcp插件读取网络数据,log4j插件读取log4j发送过来的数据等等。

    filter

    filter插件负责过滤解析input读取的数据,可以用grok插件正则解析数据,date插件解析日期,json插件解析json等等。

    output

    output插件负责将filter处理过的数据输出。可以用elasticsearch插件输出到es,rediss插件输出到redis,stdout插件标准输出,kafka插件输出到kafka等等
    trade.log日志采集。

    trade.log日志采集

    配置内容如下:

    [ruby] view plain copy
    input {  
        file {  
            type => "tradelog"  
            path => "/home/elk/his/trade.log*"  
            discover_interval => 5  
            start_position => "beginning"  
              
            sincedb_path => "/home/elk/myconf/sincedb_trade.txt"  
            sincedb_write_interval => 15  
              
            codec => plain { charset => "GB2312" }  
        }      
    }  
      
    filter {  
        grok {  
            match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}|%{WORD:name}|Oid: %{WORD:oid}|IP: %{IP:ip}|MAC: %{GREEDYDATA:mac}|%{WORD:result}|%{GREEDYDA 
            match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}|%{WORD:name}|Oid: %{WORD:oid}|IP: %{IP:ip}|MAC: %{GREEDYDATA:mac}|%{WORD:result}|"  }  
            match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}|%{WORD:name}|Oid: %{WORD:oid}|IP: %{IP:ip}|MAC: %{GREEDYDATA:mac}|"  }      
            match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}|IP: %{IP:ip}|MAC: %{GREEDYDATA:mac}|%{WORD:result}|"  }  
            match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}|IP: %{IP:ip}|MAC: %{GREEDYDATA:mac}|" }  
            remove_field  => "message"  
        }      
        date {  
            match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"]  
        }      
    }  
      
    output {  
        if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] {  
            stdout {codec => rubydebug}  
              
            elasticsearch {  
                index => "log4j-tradelog"  
                hosts => ["168.7.1.67:9200"]  
                manage_template => true  
                template_overwrite => true  
                template_name => "log4j-tradelog"  
                template => "/home/elk/myconf/tradelog_template.json"  
            }  
        }      
    }  
    

    input

    1. start_position:设置beginning保证从文件开头读取数据。
    2. path:填入文件路径。
    3. type:自定义类型为tradelog,由用户任意填写。
    4. codec:设置读取文件的编码为GB2312,用户也可以设置为UTF-8等等
    5. discover_interval:每隔多久去检查一次被监听的 path 下是否有新文件,默认值是15秒
    6. sincedb_path:设置记录源文件读取位置的文件,默认为文件所在位置的隐藏文件。
    7. sincedb_write_interval:每隔15秒记录一下文件读取位置

    filter

    日志格式如下:

    2016-05-09 09:49:13,817 [] [ACTIVE] ExecuteThread: '1' for queue: 'weblogic.kernel.Default (self-tuning)' [INFO ] com.c.command.StartLogCommand.execute(StartLogCommand.java:46) - FrontPage 
    2016-05-09 09:49:13,928 [] [ACTIVE] ExecuteThread: '2' for queue: 'weblogic.kernel.Default (self-tuning)' [INFO ] com.c.command.EndLogCommand.execute(EndLogCommand.java:44) - FrontPageAdve|
    

    grok插件

    因为该日志中有5种格式如下,以最后几个我需要的字段为例说明:

    交易名|登录名|编号|ip地址|mac地址|返回结果|异常信息  
    交易名|登录名|编号|ip地址|mac地址|返回结果|  
    交易名|登录名|编号|ip地址|mac地址|  
    交易名|ip地址|mac地址|返回结果|  
    交易名|ip地址|mac地址|
    

    所以采用5种正则规则去匹配,logstash默认会从上到下按规则去匹配,直到匹配上为止。(日志中的多行错误信息,匹配不上,logstash会在tags字段添加”_ grokparsefailure”,所以后面输出的时候会用if条件.)

    注意:5种正则规则的上下顺序,下面的规则放在上面会导致可能内容解析不全,比如源数据是:请求交易名|操作员登录名|操作员编号|ip地址|mac地址|返回结果|异常信息,如果按照“请求交易名|ip地址|mac地址。

    logstash内置了很多正则匹配规则,用户可以直接调用这些规则来解析,例如%{WORD:result} 表示调用WORD规则(即识别字符串规则)来解析并最后赋值给result字段(result字段会自动创建)。
    下面以第一条match规则为例来说明:

    match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}|%{WORD:name}|Oid: %{WORD:oid}|IP: %{IP:ip}|MAC: %{GREEDYDATA:mac}|%{WORD:result}|%{GREEDYDATA:excep 
    

    首先行首使用DATESTAMP_CN规则来识别时间,并赋值给logdate字段名;然后.识别任意字符串(.代表任意一个字符,包括特殊字符,代表个数是任意个);然后使用WORD规则(即匹配字符串规则,不包含特殊字.。

    注意:[@metadata]表示logdate这个字段在数据处理过程中只是一个临时字段,最后不会真的输出。避免了使用remove_field手动移除字段。

    注意:logstash默认不支持”YYYY-MM-dd HH:mm:ss,SSS”格式的时间匹配,需要自己定义正则表达式到logstash-2.3.1/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns/grok-patterns文件:

    DATE_CN %{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}  
    DATESTAMP_CN %{DATE_CN} %{TIME}  
    

    注意:logstash的正则表达式采用ruby语言正则表达式,具体语法可以参考网上。

    remove_field => "message"表示解析完成之后删除原来的 message字段,避免重复。

    date插件

    match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"]
    logstash默认的时间字段是@timestamp,如果不设置的话,默认是数据采集时候的时间,这里我们将日志打印的时间(即解析出的logdate字段的内容)设置为@timestamp内容,方便之后kibana根据时间检索。

    注意:解析出来的@timestamp会比实际时间早8个小时,这是内置utc时间格式问题,kibana页面展示的时候会根据浏览器当前时区自动转换回来,这里不作处理。

    output

    if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] {  
        stdout {codec => rubydebug}  
          
        elasticsearch {  
            index => "log4j-tradelog"  
            hosts => ["134.7.1.67:9200"]  
            manage_template => true  
            template_overwrite => true  
            template => "/home/elk/myconf/tradelog_template.json"  
        }  
    }
    

    前面提到过,如果grok解析失败,会在tags字段自动添加_grokparsefailure值,如果date解析失败,会在tags字段自动添加_dateparsefailure值。所以最后的输出,我们采用条件过滤掉解析失败的行内容。最终的。

    elasticsearch插件

    index:要导入的es索引
    host:es地址,有多个节点配置多个节点
    template:指定elasticsearch的mapping模板文件,如果该索引不存在,logstash会根据这个mapping模板去自动创建索引。

    stdout插件

    rubydebug标准输出,便于调试,可以不使用该插件。

    最终解析出结果示例如下:

    {  
          "@version" => "1",  
        "@timestamp" => "2016-05-09T01:44:48.366Z",  
              "path" => "/home/elk/e.log",  
              "host" => "ccc7",  
              "type" => "tradelog",  
           "opeType" => "WZQry",  
              "name" => "lhcsssz2",  
               "oid" => "abzzak",  
                "ip" => "192.168.44.105",  
               "mac" => "A1345C05-26C1-4253-8845-01CFCA8EC4FD",  
            "result" => "Success"  
    } 
    

    error.log采集

    日志实例:

    2016-09-29 17:13:24,184|ncid=1100343164|oid=acaatv|loginName=zhenglw1|transId=Withdraw|traceId=N/A-_A-88C4D-043|exceptType=com.intenft.exception.AppRTException|exceptCode=CORESYST_TXN_NATI到
    

    配置文件如下:

    input {  
        file {  
            path => "/home/elk/his/error.log*"  
            type => "errorlog"  
            start_position => "beginning"  
            discover_interval => 5  
              
            codec => multiline {  
                charset => "GB2312"  
                pattern => "^%{DATESTAMP_CN}"  
                negate => true  
                what => "next"          
            }  
              
            sincedb_path => "/home/elk/myconf/sincedb_error.txt"  
            sincedb_write_interval => 15  
        }      
    }  
      
    filter {  
        grok {  
            match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]}%{GREEDYDATA:[@metadata][keyvalue]}" }  
            remove_field  => "message"  
        }      
        date {  
            match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"]  
        }  
        kv {  
            source => "[@metadata][keyvalue]"  
            field_split => "|"  
            value_split => "="  
        }  
    }  
      
    output {  
        if "multiline" in [tags] {  
            stdout {codec => rubydebug}  
            elasticsearch {  
                index => "log4j-errorlog-3"  
                hosts => ["168.7.1.67:9200"]  
                manage_template => true  
                template_overwrite => true  
                template => "/home/elk/myconf/errorlog_template.json"  
            }  
        }      
    }  
    

    input

    1. start_position:设置beginning保证从文件开头读取数据。
    2. path:填入文件路径。
    3. type:自定义类型为tradelog,由用户任意填写。
    4. codec:multiline插件
    5. discover_interval:每隔多久去检查一次被监听的 path 下是否有新文件,默认值是15秒
    6. sincedb_path:设置记录源文件读取位置的文件,默认为文件所在位置的隐藏文件。
    7. sincedb_write_interval:每隔15秒记录一下文件读取位置

    multiline插件

    logstash默认读取一行内容为一个消息,因为错误日志包含堆栈信息,多行对应一个消息,所以使用该插件合并多行为一条消息。
    pattern:以”YYYY-MM-dd HH:mm:ss,SSS”格式开头的匹配为一条消息。
    negate:true 表示正向使用该patttern
    what:匹配到的日期属于下一条消息
    charset:设置文件编码
    filter

    grok插件

    匹配日期到logdata字段,匹配剩下的所有字符串到keyvalue临时字段,”GREEDYDATA”正则表达式为”.*”

    date插件

    match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"]  
    

    logstash默认的时间字段是@timestamp,如果不设置的话,默认是数据采集时候的时间,这里我们将日志打印的时间(即解析出的logdate字段的内容)设置为@timestamp内容,方便之后kibana根据时间检索。

    注意:解析出来的@timestamp会比实际时间早8个小时,这是内置utc时间格式问题,kibana页面展示的时候会根据浏览器当前时区自动转换回来,这里不作处理。

    kv插件

    source:解析前面grok获取的keyvalue字段

    (比如:|ncid=1100783164|oid=acaatv|loginName=zhew1|transId=Withdraw|traceId=N/A-_A-88C4D-043|exceptType=com.inteft.exception.AppRTException|exceptCode=CORESYST_TXN_NATIVE_89042|exceptMsg= .
    

    field_split:按”|”切分key-value对
    value_split:按”=”切分key 和 value,最终切分出来key作为字段名,value作为字段值

    output

    output {  
        if "multiline" in [tags] {  
            stdout {codec => rubydebug}  
            elasticsearch {  
                index => "log4j-errorlog-3"  
                hosts => ["168.7.1.67:9200"]  
                manage_template => true  
                template_overwrite => true  
                template => "/home/elk/myconf/errorlog_template.json"  
            }  
        }     
    }  
    

    该日志有2种格式的日志,一种是单行的错误信息日志,一种是多行的包含堆栈信息的日志,这2种日志内容重复,那么只需要解析单行格式的日志。kv插件解析多行格式的日志时, tags字段里没有”multipline”值.。

    elasticsearch插件

    index:要导入的es索引
    host:es地址,有多个节点配置多个节点
    template:指定elasticsearch的mapping模板文件,如果该索引不存在,logstash会根据这个mapping模板去自动创建索引。

    最终解析的结果示例如下:

    {  
        "@timestamp" => "2016-09-29T09:14:22.194Z",  
          "@version" => "1",  
              "tags" => [  
            [0] "multiline"  
        ],  
              "path" => "/home/elk/stst.log",  
              "host" => "ci7",  
              "type" => "sttlog",  
              "ncid" => "1143164",  
               "oid" => "acav",  
         "loginName" => "zhew1",  
           "transId" => "MyQuery",  
           "traceId" => "N/A8C4E-047",  
        "exceptType" => "com.exception.AppRTException",  
        "exceptCode" => "CORESYNATIVE_82243",  
         "exceptMsg" => "对不起!根据账号获取客户信息错误"  
    }  
    

    总结:

    注意:
    logstash filter中的每一个插件都有add_field,remove_field,add_tag,remove_tag 4个功能。

    附录:

    mapping模板文件

    tradelog:

    {  
        "template": "log4j-tradelog*",  
        "settings": {  
            "index.number_of_shards": 3,  
            "number_of_replicas": 0  
        },  
        "mappings": {  
            "tradelog": {  
                "_all": {  
                    "enabled": false  
                },  
                "properties": {  
                    "@timestamp": {  
                        "type": "date",  
                        "format": "strict_date_optional_time||epoch_millis",  
                        "doc_values": true  
                    },  
                    "@version": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "exception": {  
                        "type": "string",  
                        "index": "analyzed"  
                    },  
                    "path": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "host": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "ip": {  
                        "type": "ip",  
                        "index": "not_analyzed"  
                    },  
                    "logger_name": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "mac": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "name": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "oid": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "opeType": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "priority": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "result": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "type": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    }  
                }  
            }  
        }  
    }  
    

    error.log

    {  
        "template": "log4j-errorlog*",  
        "settings": {  
            "index.number_of_shards": 3,  
            "number_of_replicas": 0  
        },  
        "mappings": {  
            "errorlog": {  
                "_all": {  
                    "enabled": false  
                },  
                "properties": {  
                    "host": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "ncid": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "type": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "@version": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "exceptType": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "@timestamp": {  
                        "format": "strict_date_optional_time||epoch_millis",  
                        "type": "date"  
                    },  
                    "exceptCode": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "transId": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "priority": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "oid": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "traceId": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "exceptMsg": {  
                        "type": "string",  
                        "index": "analyzed"  
                    },  
                    "path": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "logger_name": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    },  
                    "loginName": {  
                        "type": "string",  
                        "index": "not_analyzed"  
                    }  
                }  
            }  
        }  
    }  
    
  • 相关阅读:
    android--从手动存取->View Model->Live Data->Data Binding
    android--------解决Entities and POJOs must have a usable public constructor
    开课第一周周总结
    Pandas Series: sum()方法
    .Net Core/Framework之Nginx反向代理后获取客户端IP等数据探索
    readonly与disabled的区别
    html使用frame框架导航跳转至指定的节的用法
    HTML+Css让网页自动适应电脑手机屏幕
    仿Quora的免费问答网站程序
    WebGL 纹理颜色原理
  • 原文地址:https://www.cnblogs.com/iiiiher/p/7929379.html
Copyright © 2011-2022 走看看