zoukankan      html  css  js  c++  java
  • LogStash的Filter的使用

    最近在项目中使用LogStash做日志的采集和过滤,感觉LogStash还是很强大的。

    input {
         file{
             path => "/XXX/syslog.txt"
             start_position => beginning
             codec => multiline{
                 patterns_dir => ["/XX/logstash-1.5.3/patterns"]
                 pattern => "^%{MESSAGE}"
                 negate => true
                 what => "previous"
             }
         }
    }
    filter{
        mutate{
         split => ["message","|"]
            add_field =>   {
                "tmp" => "%{[message][0]}"
            }
            add_field =>   {
                "DeviceProduct" => "%{[message][2]}"
            }
            add_field =>   {
                "DeviceVersion" => "%{[message][3]}"
            }
            add_field =>   {
                "Signature ID" => "%{[message][4]}"
            }
            add_field =>   {
                "Name" => "%{[message][5]}"
            }
        }
    
        mutate{
         split => ["tmp",":"]
            add_field =>   {
                "tmp1" => "%{[tmp][1]}"
            }
            add_field =>   {
                "Version" => "%{[tmp][2]}"
            }
            remove_field => [ "tmp" ]
        }
    
        grok{
           patterns_dir => ["/XXX/logstash-1.5.3/patterns"]
           match => {"tmp1" => "%{TYPE:type}"}
           remove_field => [ "tmp1"]
        }
    
        kv{
           include_keys => ["eventId", "msg", "end", "mrt", "modelConfidence", "severity", "relevance","assetCriticality","priority","art","rt","cs1","cs2","cs3","locality","cs2Label","cs3Label","cs4Label","flexString1Label","ahost","agt","av","atz","aid","at","dvc","deviceZoneID","deviceZoneURI","dtz","eventAnnotationStageUpdateTime","eventAnnotationModificationTime","eventAnnotationAuditTrail","eventAnnotationVersion","eventAnnotationFlags","eventAnnotationEndTime","eventAnnotationManagerReceiptTime","_cefVer","ad.arcSightEventPath"]
        }
        mutate{
         split => ["ad.arcSightEventPath",","]
            add_field =>   {
                "arcSightEventPath" => "%{[ad.arcSightEventPath][0]}"
            }
            remove_field => [ "ad.arcSightEventPath" ]
            remove_field => [ "message" ]
        }
    
    }
    output{
        kafka{
            topic_id => "rawlog"
            batch_num_messages => 20
            broker_list => "10.3.162.193:39192,10.3.162.194:39192,10.3.162.195:39192"
            codec => "json"
        }
        stdout{
           codec => rubydebug
        }
    

    input:接入数据源

    filter:对数据源进行过滤

    output: 输出的

    其中最重要的是filter的处理,目前我们的需求是需要对字符串进行key-value的提取

    1、使用了mutate中的split,能通过分割符对分本处理。

    2、通过grok使用正则对字符串进行截取处理。

    3、使用kv 提取所有的key-value

  • 相关阅读:
    Subgraph Search Over Large Graph Database
    聚类算法:K-means 算法(k均值算法)
    MongoDB 学习笔记之 TTL索引,部分索引和文本索引
    MongoDB 学习笔记之 索引选项和重建索引
    快学Scala 第二十一课 (初始化trait的抽象字段)
    MongoDB 学习笔记之 分析器和explain
    MongoDB 学习笔记之 检测存储引擎
    ELK 学习笔记之 elasticsearch基本概念和CRUD
    ELK 学习笔记之 elasticsearch head插件安装
    ELK 学习笔记之 elasticsearch环境搭建
  • 原文地址:https://www.cnblogs.com/qq27271609/p/4762562.html
Copyright © 2011-2022 走看看