zoukankan      html  css  js  c++  java
  • [logstash-input-http] 插件使用详解

    插件介绍

    Http插件是2.0版本才出现的新插件,1.x是没有这个插件的。这个插件可以帮助logstash接收其他主机或者本机发送的http报文。

    插件的原理很简单,它自己启动了一个ruby的服务器,用于接收Http请求。然后会把host(IP地址)和header相关的信息添加到event中。

    下面就看看这个插件如何使用吧!

    基本配置

    先看看默认的配置吧!

    http {}
    

    简单到心碎啊!其实有很多参数都是默认的...
    上面的配置其实相当于:

    http{
    	host => "0.0.0.0"
    	port => 8080
    	additional_codecs => {"application/json"=>"json"}
    	codec => "plain"
    	threads => 4
    	ssl => false
    }
    

    参数详解

    最主要的几个参数,也是Http中常见的属性:

    host

    默认是0.0.0.0,即所有的地址都可以发送到本机,从而接收Http信息。

    port

    是http插件中服务器运行的端口号。只要发送到“本机IP”:"该端口号"的数据都可以被http插件接收到。

    additional_codecs

    配置文本类型和codec的映射,如上面所示,默认配置了json文本对应使用json的codec。

    codec

    如果上面的映射集合中找不到文本类型对应的codec,那么会默认按照这个属性配置的codec解析。

    ssl

    是否开启SSL。

    threads

    ruby插件中服务器的启动线程,这里默认是4个。

    user、password、keystore、keystore_password

    这些都与http的认证有关系了,就不多说了。如果想要使用,再去参考文档吧!

    源码初探

    阅读插件的源码是为了更好的理解插件的使用,并且在出错的时候知道哪里出现了问题。Logstash的插件往往都有固定的书写格式,因此很容易看到插件的核心代码。

    在Input插件中,主要包含两个方法:

    public
      def register
        # register方法相当于初始化的构造方法
      end # def register
    
      # 主要的核心业务方法都在run中
      def run(queue)
        Stud.interval(@interval) do
    	  # 创建事件
          event = LogStash::Event.new("message" => @message, "host" => @host)
          # 装饰event对象
          decorate(event)
          # 放入队列中
          queue << event
        end # loop
      end # def run
    

    下面看看http的插件内容吧!

    首先看看register都做了什么吧!

     def register
        require "logstash/util/http_compressed_requests"
        # 创建Puma服务器
        @server = ::Puma::Server.new(nil) # we'll set the rack handler later
        # 下面的都是跟认证相关的....
        if @user && @password then
          token = Base64.strict_encode64("#{@user}:#{@password.value}")
          @auth_token = "Basic #{token}"
        end
        if @ssl
          if @keystore.nil? || @keystore_password.nil?
            raise(LogStash::ConfigurationError, "Settings :keystore and :keystore_password are required because :ssl is enabled.")
          end
          ctx = Puma::MiniSSL::Context.new
          ctx.keystore = @keystore
          ctx.keystore_pass = @keystore_password.value
          ctx.verify_mode = case @verify_mode
                            when 'peer'
                              Puma::MiniSSL::VERIFY_PEER
                            when 'force_peer'
                              Puma::MiniSSL::VERIFY_PEER | Puma::MiniSSL::VERIFY_FAIL_IF_NO_PEER_CERT
                            when 'none'
                              Puma::MiniSSL::VERIFY_NONE
                            end
          @server.add_ssl_listener(@host, @port, ctx)
        else
          @server.add_tcp_listener(@host, @port)
        end
        # 设置线程的数量
        @server.min_threads = 0
        @server.max_threads = @threads
        @codecs = Hash.new
        # 创建文本类型对应的codecs映射
        @additional_codecs.each do |content_type, codec|
          @codecs[content_type] = LogStash::Plugin.lookup("codec", codec).new
        end
      end # def register
    

    可以简单的把上面的代码归纳为:

    • 1 创建Puma服务器,Puma是一款ruby的高性能服务器。
    • 2 进行用户身份和密码的验证授权
    • 3 设置基本的线程信息
    • 4 创建codecs映射

    再看看run方法

    def run(queue)
        p = Proc.new do |req|
          begin
            remote_host = req['puma.socket'].peeraddr[3]
            REJECTED_HEADERS.each {|k| req.delete(k) }
            req = lowercase_keys(req)
            body = req.delete("rack.input")
            # 这里使用相应的codec解析对应的Body信息
            @codecs.fetch(req["content_type"], @codec).decode(body.read) do |event|
              # 这里遍历每个事件,然后添加host和headers信息
              event["host"] = remote_host
              event["headers"] = req
              decorate(event)
              queue << event
            end
            # 如果正常处理,则返回ok
            ['200', @response_headers, ['ok']]
          rescue => e
            @logger.error("unable to process event #{req.inspect}. exception => #{e.inspect}")
            ['500', @response_headers, ['internal error']]
          end
        end
    
        auth = Proc.new do |username, password|
          username == @user && password == @password.value
        end if (@user && @password)
    
        @server.app = Rack::Builder.new do
          use(Rack::Auth::Basic, &auth) if auth
          use CompressedRequests
          run(p)
        end
        @server.run.join
      end
    
      private
      def lowercase_keys(hash)
        new_hash = {}
        hash.each_pair do |k,v|
          new_hash[k.downcase] = v
        end
        new_hash
      end
    

    看了上面的代码,基本对http的原理有了一定了解吧!

  • 相关阅读:
    希腊字母写法
    The ASP.NET MVC request processing line
    lambda aggregation
    UVA 10763 Foreign Exchange
    UVA 10624 Super Number
    UVA 10041 Vito's Family
    UVA 10340 All in All
    UVA 10026 Shoemaker's Problem
    HDU 3683 Gomoku
    UVA 11210 Chinese Mahjong
  • 原文地址:https://www.cnblogs.com/xing901022/p/5256227.html
Copyright © 2011-2022 走看看