zoukankan      html  css  js  c++  java
  • logstash 配置文件实例

    这个配置文件记不起来是从那个地方下载的来的了,感谢那位无私的朋友

    input {
      beats {            #shipper 端用的是 filebeat,所以用这个插件
        port => 510        #开510端口,shipper 端发到 这里
      }
      beats {
        port => 511
        codec => json    #原始日志是json格式,这里指定json格式,就可以解析好日志,下面filter 就不用写grok表达式了
      }
      tcp {                #shipper用rsyslog,不像filebeat会自带元数据,所以用tcp开个端口就好
        port => 512
        type => commamd    #增加个字段,以此来查找此类型日志
      }
      tcp {
        port => 513
        type => win_event
        codec => multiline {    #win的event日志,是多行的,用多行插件来做全合并
          pattern => "^%{NGINXERR_DATE}"    #这个正则变量,是一种时间日期格式,就是说日志以这种日期开头,后面的都算一条日志
          negate => true
          what => "previous"
        }
      }
    }

    filter {
      if [input_type] == "log" {    #判断值,filebeat 的元数据(即是每一条日志都会附带上的数据)就有这个字段。
      ruby {    #ruby插件,下面init 是声明有多少个字段。code 是声明 使用什么分隔符,分隔出每一个字段的值。要搞这个,前提是你的日志格式,指定生成以XXX符号为间隔啦
        init => "@kname = ['http_x_forwarded_for','timestamp','request','status ','body_bytes_sent','http_referer','http_user_agent','remote_addr','http_head','upstream_addr','request_time','upstream_response_time']"
        code => "new_event = LogStash::Event.new(Hash[@kname.zip(event['message'].split('|'))]); new_event.remove('@timestamp');event.append(new_event)"
      }
      if [request] {    #还是判断,这是上面分好的字段,这里对request字段再分割一下
        ruby {            #同上啦
          init => "@kname = ['method','URL','verb']"
          code => "new_event = LogStash::Event.new(Hash[@kname.zip(event['request'].split(' '))]); new_event.remove('@timestamp');event.append(new_event)"
        }
      }
      if [URL] {        #跟上面的request判断 一个意思,就是要把一个字段,分的更细的方便日志分析
        ruby {
          init => "@kname = ['uri','url_args']"
          code => "new_event = LogStash::Event.new(Hash[@kname.zip(event['URL'].split('?'))]); new_event.remove('@timestamp');event.append(new_event)"
          remove_field => ["request","message","URL"]    #这是每一个插件都有的命令,把多余的字段移除掉吧。
            }
      }


      mutate {        #匹配插件,常用的
        convert => [    #转换,上面切割好字段,就像数据库表一样,都是str类型,下面就是把字段转换成对应的类型。举例:值是 - ,也会被转成0 。多个值的会变成数组。
          "body_bytes_sent" , "integer",
          "request_time", "float",
          "upstream_response_time", "float"
        ]
      }
      date {    #时间插件
        match => ["timestamp" , "ISO8601"]    #timestamp这个字段是日志里带的那个时间,而@timestamp 这个是默认的时间(此描述不准),通常我们多用这个@timestamp画图作为时间轴。ISO8601就是一种时间格式。详见搜下logstash N级目录下grok-patterns文件
      }
    }

      if [input_type] == "test" {
        mutate {split => ["ups_resp_time", ","]}    #就是一个字段可能有多个值。用逗号隔开。
        mutate {convert => ["ups_resp_time", "float"]} #然后再转成浮点型
        mutate {convert => ["request_time", "float"]}
        date {match => ["timestamp","ISO8601"]}
      }

      if [type] == "commad" {    #这是一条别的日志,下面就是grok 解析日志的写法。日志长这样:2016-06-17 08:42:59 ## root@/dev/pts/1 ---> 121.33.26.18 51101 120.26.13.18 22 ##  snmpnetstat -V
        grok {  match => {"message" => "%{NGINXERR_DATE:log_timestamp} %{NOTSPACE:xx} %{USERNAME:user}@%{NOTSPACE:tty} %{NOTSPACE:xxx} %{IPV4:chient_ip} %{NUMBER:client_port} %{IPV4:server_ip} %{NUMBER:server_port} %{NOTSPACE:xxxx} %{GREEDYDATA:command}"}
          remove_field => ['xx']
          remove_field => ['xxx']
          remove_field => ['xxxx']
          remove_field => ['message']  }
        date { match => ["log_timestamp" , "yyyy-MM-dd HH:mm:ss"] }
      }

      if [type] == "win_event" {    #在input模块那里,已用多行插件处理了。这里当成一行写grok就好。
        grok {
          match => {"message" => "%{NGINXERR_DATE:winlog_timestamp} %{NOTSPACE:win_hostname} %{NOTSPACE:Level} %{NUMBER:event_id} %{GREEDYDATA:event}" }
          remove_field => ['message']
        }
        date {
          match => ["winlog_timestamp" , "yyyy-MM-dd HH:mm:ss"]
        }
      }

    # 下面这些呢,因为kibana显示问题,想看的直观些,于是在这里做个替换。存进es里时,就直接这中文啦。kibana展示,自然出是中文
    if [host] == "10.168.24.70"  { mutate { replace => { "host" => "精武门" } } }
    if [host] == "10.117.16.241" { mutate { replace => { "host" => "nginx_1" } } }
    if [host] == "10.117.9.162"  { mutate { replace => { "host" => "nginx_2" } } }
    if [host] == "10.51.8.234" { mutate { replace => { "host" => "监控" } } }
    if [host] == "10.47.69.198" { mutate { replace => { "host" => "APP" } } }
    if [win_hostname] == "iZ23syf95oaZ" { mutate { replace => { "win_hostname" => "数据库" } } }
    if [win_hostname] == "iZ234bmxy7wZ" { mutate { replace => { "win_hostname" => "网站" } } }
    if [win_hostname] == "iZ233n40vi4Z" { mutate { replace => { "win_hostname" => "资讯" } } }
    if [win_hostname] == "iZ23z5w0bj3Z" { mutate { replace => { "win_hostname" => "DataCenter" } } }

    }

    # 这个没啥好说的吧。我就一个索引名。默认完了。
    output {
      elasticsearch { hosts => "127.0.0.1:9200" }
    }

  • 相关阅读:
    jekyll+github搭建个人博客总结
    ES6-let命令
    Ajax-快速上手前后端交互
    第一次项目总结——校园博览会
    Python获取exe文件版本
    @JsonFormat与@DateTimeFormat注解的使用
    前后端时间转化
    左右flex布局
    fastjson将json字符串转化成map的五种方法
    RestTemplate 发送post请求
  • 原文地址:https://www.cnblogs.com/smail-bao/p/5695656.html
Copyright © 2011-2022 走看看