zoukankan      html  css  js  c++  java
  • Logstash:处理多个input

    Logstash:处理多个input

    Logstash的整个pipleline分为三个部分:

    • input插件:提取数据。 这可以来自日志文件,TCP或UDP侦听器,若干协议特定插件(如syslog或IRC)之一,甚至是排队系统(如Redis,AQMP或Kafka)。 此阶段使用围绕事件来源的元数据标记传入事件。
    • filter 插件:插件转换并丰富数据
    • output插件: 将已处理的事件加载到其他内容中,例如ElasticSearch或其他文档数据库,或排队系统,如Redis,AQMP或Kafka。 它还可以配置为与API通信。 也可以将像PagerDuty这样的东西连接到Logstash输出。

    这里的input可以支持多个input,同时多个worker可以处理filter及output:

    Logstash配置文件

    Logstash的配置文件如下:

    # cat multi-input.conf
    
        input {
          file {
            path => "/Users/liuxg/data/multi-input/apache.log"
          	start_position => "beginning"
            sincedb_path => "/dev/null"
            # ignore_older => 100000
            type => "apache"
          }
        }
         
        input {
          file {
            path => "/Users/liuxg/data/multi-input/apache-daily-access.log"
          	start_position => "beginning"
            sincedb_path => "/dev/null"
            type => "daily"
          }
        }
         
        filter {
          	grok {
            	match => {
              		"message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} [%{HTTPDATE:timestamp}] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}'
            	}
          	}
         
        	if[type] == "apache" {
        		mutate {
        	  		add_tag => ["apache"]
        	  	}
        	}
         
        	if [type] == "daily" {
        		mutate {
        			add_tag => ["daily"]
        		}
        	} 
        }
         
         
        output {
        	stdout {
        		codec => rubydebug
        	}
         
        	if "apache" in [tags] {
        	  	elasticsearch {
        	    	index => "apache_log"
        	    	template => "/Users/liuxg/data/apache_template.json"
        	    	template_name => "apache_elastic_example"
        	    	template_overwrite => true
        	  }	
        	}
         
        	if "daily" in [tags] {
        	  	elasticsearch {
        	    	index => "apache_daily"
        	    	template => "/Users/liuxg/data/apache_template.json"
        	    	template_name => "apache_elastic_example"
        	    	template_overwrite => true
        	  }	
        	}	
        }
    

    为了说明问题的方便,使用了两个input。它们分别对应不同的log文件。对于这两个input,也使用了不同的type来表示:apache和daily。尽管它们的格式是一样的,它们共同使用同样的一个grok filter,但是我们还是想分别对它们进行处理。为此,添加了一个tag。也可以添加一个field来进行区别。在output的部分,根据在filter部分设置的tag来对它们输出到不同的index里。

    运行Logstash

    可以通过如下的命令来运行:

    ./bin/logstash -f ~/data/multi-input/multi-input.conf
    

    当运行这个例子的时候,需要根据自己存放multi-input.conf文件的位置改变而改变上面的命令。

    根据显示的结果可以看出来daily的事件最早被处理及输出。接着apache的数据才开始处理。在实际的应用中,我们可能有不同的数据源,比如来自其它beats的监听某个端口的数据。

    可以在Kibana中看到我们最终的index数据.

  • 相关阅读:
    单精度和双精度
    @Transactional注解用法
    JPA No EntityManager with actual transaction available for current thread
    上传文件Request Entity Too Large解决办法
    PG数据库查看当前会话和结束会话
    Chrome浏览器记不住密码也不提示保存密码win10
    ARM平台VMP保护开发入门
    关于我
    HDU7072:Boring data structure problem——题解
    HDU7067:Just another board game——题解
  • 原文地址:https://www.cnblogs.com/sanduzxcvbnm/p/12076546.html
Copyright © 2011-2022 走看看