zoukankan      html  css  js  c++  java
  • [logstash-input-http] 插件使用详解

    插件介绍

    Http插件是2.0版本才出现的新插件,1.x是没有这个插件的。这个插件可以帮助logstash接收其他主机或者本机发送的http报文。

    插件的原理很简单,它自己启动了一个ruby的服务器,用于接收Http请求。然后会把host(IP地址)和header相关的信息添加到event中。

    下面就看看这个插件如何使用吧!

    基本配置

    先看看默认的配置吧!

    http {}
    

    简单到心碎啊!其实有很多参数都是默认的...
    上面的配置其实相当于:

    http{
    	host => "0.0.0.0"
    	port => 8080
    	additional_codecs => {"application/json"=>"json"}
    	codec => "plain"
    	threads => 4
    	ssl => false
    }
    

    参数详解

    最主要的几个参数,也是Http中常见的属性:

    host

    默认是0.0.0.0,即所有的地址都可以发送到本机,从而接收Http信息。

    port

    是http插件中服务器运行的端口号。只要发送到“本机IP”:"该端口号"的数据都可以被http插件接收到。

    additional_codecs

    配置文本类型和codec的映射,如上面所示,默认配置了json文本对应使用json的codec。

    codec

    如果上面的映射集合中找不到文本类型对应的codec,那么会默认按照这个属性配置的codec解析。

    ssl

    是否开启SSL。

    threads

    ruby插件中服务器的启动线程,这里默认是4个。

    user、password、keystore、keystore_password

    这些都与http的认证有关系了,就不多说了。如果想要使用,再去参考文档吧!

    源码初探

    阅读插件的源码是为了更好的理解插件的使用,并且在出错的时候知道哪里出现了问题。Logstash的插件往往都有固定的书写格式,因此很容易看到插件的核心代码。

    在Input插件中,主要包含两个方法:

    public
      def register
        # register方法相当于初始化的构造方法
      end # def register
    
      # 主要的核心业务方法都在run中
      def run(queue)
        Stud.interval(@interval) do
    	  # 创建事件
          event = LogStash::Event.new("message" => @message, "host" => @host)
          # 装饰event对象
          decorate(event)
          # 放入队列中
          queue << event
        end # loop
      end # def run
    

    下面看看http的插件内容吧!

    首先看看register都做了什么吧!

     def register
        require "logstash/util/http_compressed_requests"
        # 创建Puma服务器
        @server = ::Puma::Server.new(nil) # we'll set the rack handler later
        # 下面的都是跟认证相关的....
        if @user && @password then
          token = Base64.strict_encode64("#{@user}:#{@password.value}")
          @auth_token = "Basic #{token}"
        end
        if @ssl
          if @keystore.nil? || @keystore_password.nil?
            raise(LogStash::ConfigurationError, "Settings :keystore and :keystore_password are required because :ssl is enabled.")
          end
          ctx = Puma::MiniSSL::Context.new
          ctx.keystore = @keystore
          ctx.keystore_pass = @keystore_password.value
          ctx.verify_mode = case @verify_mode
                            when 'peer'
                              Puma::MiniSSL::VERIFY_PEER
                            when 'force_peer'
                              Puma::MiniSSL::VERIFY_PEER | Puma::MiniSSL::VERIFY_FAIL_IF_NO_PEER_CERT
                            when 'none'
                              Puma::MiniSSL::VERIFY_NONE
                            end
          @server.add_ssl_listener(@host, @port, ctx)
        else
          @server.add_tcp_listener(@host, @port)
        end
        # 设置线程的数量
        @server.min_threads = 0
        @server.max_threads = @threads
        @codecs = Hash.new
        # 创建文本类型对应的codecs映射
        @additional_codecs.each do |content_type, codec|
          @codecs[content_type] = LogStash::Plugin.lookup("codec", codec).new
        end
      end # def register
    

    可以简单的把上面的代码归纳为:

    • 1 创建Puma服务器,Puma是一款ruby的高性能服务器。
    • 2 进行用户身份和密码的验证授权
    • 3 设置基本的线程信息
    • 4 创建codecs映射

    再看看run方法

    def run(queue)
        p = Proc.new do |req|
          begin
            remote_host = req['puma.socket'].peeraddr[3]
            REJECTED_HEADERS.each {|k| req.delete(k) }
            req = lowercase_keys(req)
            body = req.delete("rack.input")
            # 这里使用相应的codec解析对应的Body信息
            @codecs.fetch(req["content_type"], @codec).decode(body.read) do |event|
              # 这里遍历每个事件,然后添加host和headers信息
              event["host"] = remote_host
              event["headers"] = req
              decorate(event)
              queue << event
            end
            # 如果正常处理,则返回ok
            ['200', @response_headers, ['ok']]
          rescue => e
            @logger.error("unable to process event #{req.inspect}. exception => #{e.inspect}")
            ['500', @response_headers, ['internal error']]
          end
        end
    
        auth = Proc.new do |username, password|
          username == @user && password == @password.value
        end if (@user && @password)
    
        @server.app = Rack::Builder.new do
          use(Rack::Auth::Basic, &auth) if auth
          use CompressedRequests
          run(p)
        end
        @server.run.join
      end
    
      private
      def lowercase_keys(hash)
        new_hash = {}
        hash.each_pair do |k,v|
          new_hash[k.downcase] = v
        end
        new_hash
      end
    

    看了上面的代码,基本对http的原理有了一定了解吧!

  • 相关阅读:
    linux下C++程序开发范例
    a list of compiler books — 汗牛充栋的编译器参考资料
    中国象棋将帅问题
    CPU利用率问题:操作系统原理和API
    算法性能分析
    MySQL时间分组查询
    在MongoDB的MapReduce上踩过的坑
    C++双缓冲多线程分析大文件词频
    MongoDB进行MapReduce的数据类型
    得到内网域管理员的5种常见方法<转>
  • 原文地址:https://www.cnblogs.com/xing901022/p/5256227.html
Copyright © 2011-2022 走看看