zoukankan      html  css  js  c++  java
  • logstash解析嵌套json格式数据

    logstash解析嵌套json格式数据

    1、源文件

      1.原日志文件为

    2019-10-28 09:49:44:947 [http-nio-8080-exec-23] INFO  [siftLog][qewrw123ffwer2323fdsafd] - logTime:2019-10-28 09:49:25.833-receiveTime:2019-10-28 09:49:44.044-{"area":"","frontInitTime":0,"initiatePaymentMode":"plugin_manual","network":"电信","os":"Microsoft Windows 7","payStatus":"1","reqs":[{"curlCode":"0","end":"2019-10-28 09:49:25.233","errorCode":"","errorDesc":"","totalTime":2153}],"settleAccountsTime":0}
    

      在这里我们需要先把json前面一段的正则写出来,由于这些数据在实际生产没什么实际意义,所以没重点分字段

      DATETIME %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?
           ACCESSLOG %{DATETIME:logTime} [%{DATA:threadName}] %{DATA:loglevel}  [%{DATA:logType}][%{DATA:appId}] - logTime:%{DATETIME:logTime2}-receiveTime:%{DATETIME:receiveTime}-%{GREEDYDATA:jsonMsg}

      这个文件json中间还嵌套了一个json,所以需要把里面嵌套的json在拿出来解析,故logstash配置文件应该写成  

    input {
      kafka {
        #bootstrap_servers => "kafka-service.ops:9092"
        bootstrap_servers => "172.27.27.220:9092,172.27.27.221:9092,172.27.27.222:9092"
        topics => ["test-grok"]
        codec => "json"
        type => "test-grok"
      }
    }
    
    filter {
      if [type] == "test-grok" {
        grok {
            patterns_dir => [ "/opt/appl/logstash/patterns" ]
            match => { "message" => "%{ACCESSLOG}" }
        }
        mutate {
          gsub => [ "jsonMsg","[","" ]
          gsub => [ "jsonMsg","]","" ]
        }
        json {
          source => "jsonMsg"
        }
        mutate {
          add_field => { "reqs_json" => "%{reqs}" }
        }
        json {
          source => "reqs_json"
          remove_field => ["reqs","reqs_json","message","jsonMsg"]
        }
      }
    
      ruby {
        code => "event.timestamp.time.localtime"
      }
    
    }
    
    output {
      elasticsearch {
        hosts => ["172.27.27.220:9200","172.27.27.221:9200","172.27.27.222:9200"]
        index => "logstash-test-grok-%{+YYYY.MM.dd}"
        template_overwrite => true
      }
    }
    

      

      2.原日志文件为  

    [2019-10-28 10:01:01.169] [Thread-13086] INFO  [192.168.2.1, 192.168.1.1, 192.168.1.2_1572_smallTrade] [INTERFACE] - [HTTP] [request] - {"latitude":"","cardCode":"","memberCouponNo":"","transAmount":"900","hbFqNum":"","confirmCode":"9357","couponAmount":"","lastCost":"2360","memberMobile":"","timestamp":"1572228060000","longitude":""}
    

      日志只需要取到有lastCost这个关键字的,所以filebeat配置应该为  

    - type: log
      enabled: true
      paths:
        - /opt/appl/tomcat/logs/test/test.log
      include_lines: ['.*lastCost.*']
      tail_files: true
      fields:
        type: interface
        log_module: test-interface
    output.kafka:
      enabled: true
      hosts: ["172.27.27.220:9092,172.27.27.221:9092,172.27.27.222:9092"]
      topic: '%{[fields][type]}'
    

      

      由于研发同事把客户端的IP加到了第一个第四个字段的第一个IP,所以要把这个IP单独拿出来分析

      DATETIME %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?

      

    input {
           kafka {
             bootstrap_servers => "172.27.27.220:9092,172.27.27.221:9092,172.27.27.222:9092"
             topics => ["interface"]
             codec => "json"
             type => "test-interface"
           }
    }
    
    filter {
            if [type] == "test-interface" {
                    grok {
                            patterns_dir => [ "/opt/logstash/patters" ]
                            match => { "message" => "[%{DATETIME:log_timestamp}] [%{DATA:ThreadName}] %{LOGLEVEL:logLevel}  [%{DATA:IP}] [%{DATA:InterfaceTag}] - [%{DATA:Protocol}] [%{DATA:LogType}] - %{GREEDYDATA:jsonMsg2}" }
                    }
                    json {
                            source => "jsonMsg2"
                            remove_field => [ "jsonMsg2","message" ]
                    }
                    mutate {
                            convert => [ "lastCost","float" ]
                            split => ["IP",", "]
                            add_field => { "clientIp" => "%{[IP][0]}" }
                            add_field => { "proxyIp" => "%{[IP][1]}" }
                            add_field => { "time" => "%{[IP][2]}" }
                    }
                    geoip {
                            source => "clientIp"
                            #database => "/opt/logstash-interface/Geoip/GeoLite2-City_20191022/GeoLite2-City.mmdb"
                    }
                    }
                    ruby {
                    code => "event.timestamp.time.localtime"
                    }
    }
    
    output {
    	elasticsearch {
    		hosts => ["172.27.27.220:9200","172.27.27.221:9200","172.27.27.222:9200"]
    		index => "logstash-test-interface-%{+YYYY.MM.dd}"
    		template_overwrite => true
    	}
    }
    

      

  • 相关阅读:
    64码高清网络电视V2.4.2
    无线网络探测V1.22
    居民身份证号码查询系统
    ORA-12505 TNS:listener does not currently know of SID given in connect descriptor
    weblogic项目部署问题--Deployment could not be created. Deployment creator is null.
    ORA-01461: can bind a LONG value only for insert into a LONG column 解决方法
    前端基础知识学习第五节(Vue篇)
    前端基础知识学习第三节
    前端基础知识学习第二节
    你真的会用parseInt方法吗?
  • 原文地址:https://www.cnblogs.com/jcici/p/11750690.html
Copyright © 2011-2022 走看看