zoukankan      html  css  js  c++  java
  • Logstash:处理多个input

    Logstash:处理多个input

    Logstash的整个pipleline分为三个部分:

    • input插件:提取数据。 这可以来自日志文件,TCP或UDP侦听器,若干协议特定插件(如syslog或IRC)之一,甚至是排队系统(如Redis,AQMP或Kafka)。 此阶段使用围绕事件来源的元数据标记传入事件。
    • filter 插件:插件转换并丰富数据
    • output插件: 将已处理的事件加载到其他内容中,例如ElasticSearch或其他文档数据库,或排队系统,如Redis,AQMP或Kafka。 它还可以配置为与API通信。 也可以将像PagerDuty这样的东西连接到Logstash输出。

    这里的input可以支持多个input,同时多个worker可以处理filter及output:

    Logstash配置文件

    Logstash的配置文件如下:

    # cat multi-input.conf
    
        input {
          file {
            path => "/Users/liuxg/data/multi-input/apache.log"
          	start_position => "beginning"
            sincedb_path => "/dev/null"
            # ignore_older => 100000
            type => "apache"
          }
        }
         
        input {
          file {
            path => "/Users/liuxg/data/multi-input/apache-daily-access.log"
          	start_position => "beginning"
            sincedb_path => "/dev/null"
            type => "daily"
          }
        }
         
        filter {
          	grok {
            	match => {
              		"message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} [%{HTTPDATE:timestamp}] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}'
            	}
          	}
         
        	if[type] == "apache" {
        		mutate {
        	  		add_tag => ["apache"]
        	  	}
        	}
         
        	if [type] == "daily" {
        		mutate {
        			add_tag => ["daily"]
        		}
        	} 
        }
         
         
        output {
        	stdout {
        		codec => rubydebug
        	}
         
        	if "apache" in [tags] {
        	  	elasticsearch {
        	    	index => "apache_log"
        	    	template => "/Users/liuxg/data/apache_template.json"
        	    	template_name => "apache_elastic_example"
        	    	template_overwrite => true
        	  }	
        	}
         
        	if "daily" in [tags] {
        	  	elasticsearch {
        	    	index => "apache_daily"
        	    	template => "/Users/liuxg/data/apache_template.json"
        	    	template_name => "apache_elastic_example"
        	    	template_overwrite => true
        	  }	
        	}	
        }
    

    为了说明问题的方便,使用了两个input。它们分别对应不同的log文件。对于这两个input,也使用了不同的type来表示:apache和daily。尽管它们的格式是一样的,它们共同使用同样的一个grok filter,但是我们还是想分别对它们进行处理。为此,添加了一个tag。也可以添加一个field来进行区别。在output的部分,根据在filter部分设置的tag来对它们输出到不同的index里。

    运行Logstash

    可以通过如下的命令来运行:

    ./bin/logstash -f ~/data/multi-input/multi-input.conf
    

    当运行这个例子的时候,需要根据自己存放multi-input.conf文件的位置改变而改变上面的命令。

    根据显示的结果可以看出来daily的事件最早被处理及输出。接着apache的数据才开始处理。在实际的应用中,我们可能有不同的数据源,比如来自其它beats的监听某个端口的数据。

    可以在Kibana中看到我们最终的index数据.

  • 相关阅读:
    BZOJ2821 作诗(Poetize) 【分块】
    BZOJ2724 蒲公英 【分块】
    Codeforces 17E Palisection 【Manacher】
    BZOJ2565 最长双回文串 【Manacher】
    Codeforces 25E Test 【Hash】
    CODEVS3013 单词背诵 【Hash】【MAP】
    HDU2825 Wireless Password 【AC自动机】【状压DP】
    HDU2896 病毒侵袭 【AC自动机】
    HDU3065 病毒侵袭持续中【AC自动机】
    HDU2222 Keywords Search 【AC自动机】
  • 原文地址:https://www.cnblogs.com/sanduzxcvbnm/p/12076546.html
Copyright © 2011-2022 走看看