zoukankan      html  css  js  c++  java
  • ELK之logstash

    下载安装(Redhat/Centos7)

    • rpm --import http://packages.elasticsearch.org/GPG-KEY-elasticsearch cat > /etc/yum.repos.d/logstash.repo <<EOF [logstash-5.0] name=logstash repository for 5.0.x packages baseurl=http://packages.elasticsearch.org/logstash/5.0/centos gpgcheck=1 gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch enabled=1 EOF yum clean all yum install logstash

    hello world

    • # bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'

    logstash用不同的线程来实现对输入信息处理之后显示成我们希望的样子,logstash给每个线程都取了名字,输入的叫xx,过滤的叫xx

    数据在线程之间以事件的形式流传,不要叫行,因为logstash可以处理多行事件。

    logstash会给事件添加一些额外的信息,最重要的就是@timestamp,用来标记事件的发生时间。因为这个字段涉及到logstash的内部流转,所以必须是一个joda对象,如果尝试自己给一个字符串字段重命名为@timestamp的话,logstash会直接报错。所以请使用filters/data插件来管理这个特殊字段。

    另外的几个字段:

    • host标记发生位置
    • type标记事件的唯一类型
    • tags标记事件的某方面属性。这是一个数组,一个事件可以有多个标签

    你可以随意给事件添加字段或者从事件里删除字段。事实上事件就是一个Ruby对象,或者更简单的理解为就是一个哈希。

    注:每个logstash过滤插件都有四个方法:

    add_tag、remove_tag、add_field、remove_field

    配置语法

    logstash习惯用shipper、broker、indexer来描述数据流中不同进程的角色。

    语法

    在很多的运维场景中并没有用logstash作为shipper,或者说 没有用elasticsearch作为是数据存储,也就是没有indexer。

    logstash设计了自己的DSL--有点像Puppet的DSL,两者均是通过Ruby语言写的--包括区域、注释、数据类型(布尔值、字符串、数值、数组、哈希)。条件判断,字段引用等。

    区段

    logstash用{}来定义区域。区域内可以包括插件区域定义,可以在一个区域中定义多个插件。插件区域内则可以定义键值对设置。示例:

    input{

    stdin{}

    syslog{}

    }

    数据类型

    logstash支持少量的数据值类型

    • bool

    debug => true

    • string

    host => "hostname"

    • number

    port => 514

    • array

    match => ["datetime","UNIX","ISO8601"]

    • hash

    option => {

    key1 => "value1",

    key2 => "value2"

    }

    注:如果版本低于1.2.0,哈希的语法跟数组是一样的,示例:

    match => ["field1","pattern1","field2","pattern2"]

    字段引用

    字段是logstash::Event对象的属性。可以想象字段就像一个键值对。如果你想在Logstash中使用字段的值,只需要把字段的名字写在中括号中就行了,这就叫字段引用。

    对于嵌套字段(也就是多维哈希表,或者叫哈希的哈希),每层字段名都写在[]中。比如你可以从geoip获取到longitude值(这是个笨办法,实际上有单独的字段专门存这个数据的):

    [geoip][location][0]

    注:logstash的数组也支持倒序下标,及[geoip][location][-1]可以获取数组最后一个元素的值。

    Logstash还支持变量内插,在字符串里使用字段引用的方法是这样:

    “the longitude is %{[geoip][location][0]}”

    条件判断(condition)

    Logstash从1.3.0版本开始支持条件判断和表达式。

    表达式支持下面这些操作符:

    • ==,!=,<,>,<=,>=
    • =~(匹配正则),!~(不匹配正则)
    • in(包含),not in(不包含)
    • and,or,nand(非与),xor(非或)
    • ()(复合表达式),!()(对复合表达式取反)

    示例:

    if "_grokparsefailure" not in [tags] {
    } else if [status] !~ /^2dd/ or ( [url] == "/noc.gif" nand [geoip][city] != "beijing" ) {
    } else {
    }

    命令行参数 

    logstash提供了一个shell脚本叫logstash,方便快速运行。

    • -e

    即执行。事实上你可以不写任何具体配置,直接运行bin/logstash -e ‘ ’达到相同的效果,这个参数的默认值是下面这样的:

    input{

           stdin{  }    

    }

    output{

            stdout{  }

    }

    • --config或-f

    即文件。bin/logstash -f agent.conf,或者bin/logstash -f /etc/logstash.d/

    注:logstash列出目录下所有文件时是字母排序的。而logstash配置段的filter和output都是顺序执行的,所以顺序非常重要。采用多文件管理的用户,推荐采用数字编号方式命名配置文件,同时在配置中,严谨采用if判断限定不同日志的动作。

    • --configtest或-t

    即测试。用来测试logstash读取的配置文件是否存在语法错误。logstash配置语法是用grammar.treetop定义的,尤其是使用了上一条提到的读取目录方式的读者,尤其要提前测试。

    • --log或-l

    即日志。logstash默认输出日志到标准错误。生产环境下你可以通过bin/logstash -l log/logstash.log命令来统一存储日志。

    • --pipelinne-workers或-w

    运行filter和output的pipeline线程的数量。默认是CPU核数。

    • --pipeline-batch-delay或-u

    每个logstash pipeline线程,在打包批量日志的时候,最多等待毫秒数。默认是5ms。

    • --pluginpath或-P

    可以写自己的插件,然后用bin/logstash --pluginpath /path/to/own/plugins进行加载。

    • -verbose

    输出一定的调试日志。

    设置文件

    从logstash 5.0开始,新增了$LS_HOME/config/logstash.yml文件,可以将所有的命令行参数都通过YAML文件方式设置。同时为了反映命令行配置参数的层级关系,参数也都改成用.而不是-了。

    pipeline:

            workers:24

       bath:

        size:125

        delay:5

    plugin的安装

    从logstash1.5.0版本开始,logstash将所有插件都独立拆分成gem包。这样每个插件都可以独立更新,不用等待logstash自身做整体更新的时候才能使用。

    为了达到这个目标,logstash配置了专门的plugin管理命令

    plugin用法说明

    Usage:
        bin/logstash-plugin [OPTIONS] SUBCOMMAND [ARG] ...
    
    Parameters:
        SUBCOMMAND                    subcommand
        [ARG] ...                     subcommand arguments
    
    Subcommands:
        install                       Install a plugin
        uninstall                     Uninstall a plugin
        update                        Install a plugin
        list                          List all installed plugins
    
    Options:
        -h, --help                    print help

     示例

    bin/logstash-plugin list查看本机现在有多少插件可用(起始就是在vender/bundle/jruby/1.9/gems的目录下)

    若发布新模块logstash-output-webhdfs。只需要运行:bin/logstash-plugin install logstash-output-webhdfs

    假如只是升级的话:bin/logstash-plugin update logstash-input-tcp

    本地插件安装

    bin/logstash-plugin install /path/to/logstash-filter-crash.gem

    执行成功后在logstash-5.0.0目录下的Gemfile文件最后一行多出一段内容:

    gem "logstash-filter-crash", "1.1.0", :path => "vendor/local_gems/d354312c/logstash-filter-mweibocrash-1.1.0"

    同时Gemfile.jruby-1.9.lock文件开头也会多出一段内容:

    PATH
      remote: vendor/local_gems/d354312c/logstash-filter-crash-1.1.0
      specs:
        logstash-filter-crash (1.1.0)
          logstash-core (>= 1.4.0, < 2.0.0)

     长期运行

    标准service方式

    rpm发行包安装的用户:/etc/init.d/logstash脚本中,会加载/etc/init.d/functions库文件,利用其中的daemon函数,将logstash进程作为后台程序运行。

    所以只要确保配置文件存在于/etc/logstash/conf.d/目录下所有的conf结尾的文件且无其他文件,即可通过service logstash start启动

    最基础的nohup方式

    command
    command > /dev/null
    command > /dev/null 2>&1
    command &
    command > /dev/null &
    command > /dev/null 2>&1 &
    command &> /dev/null
    nohup command &> /dev/null

    想要维持一个长期后台运行的logstash,你需要同时在命令前面加nohup,后面加&。

    更优雅的SCREEN方式

    screen算是linux运维一个中高级技巧。通过screen命令创建的环境下运行的终端命令,其父进程不是sshd的登录会话,而是screen。这样就可以避免用户退出进程消失的问题,又随时能重新接管回终端继续操作。

    创建独立的screen命令:

    screen -dmS elkscreen_1

    接管连入创建的elkscreen_1

    screen -r elkscreen_1

    运行logstash之后,不要按Ctrl+C,而是按Ctrl+A+D,断开环境。想要重新接管,依然screen -r elkscreen_1即可。

    若创建了多个screen,可通过:

    screen -list

    最推荐的daemontools方式

    daemontools工具:daemontools、python实现的supervisord、perl实现的ubic、ruby实现的god等。

    以supervisord为例,因为出现比较早,可通过epel仓库直接安装:

    yum -y install supervisord --enablerepo=epel

     在/etc/supervisord.conf配置文件中添加内容,定义你要启动的程序:

    [program:elkpro_1]
    environment=LS_HEAP_SIZE=5000m
    directory=/opt/logstash
    command=/opt/logstash/bin/logstash -f /etc/logstash/pro1.conf -w 10 -l /var/log/logstash/pro1.log
    [program:elkpro_2]
    environment=LS_HEAP_SIZE=5000m
    directory=/opt/logstash
    command=/opt/logstash/bin/logstash -f /etc/logstash/pro2.conf -w 10 -l /var/log/logstash/pro2.log

    然后service supervisord start 即可。

    logstash会以supervisord子进程的身份运行,你还可以使用supervisordctl命令单独控制一系列logstash子进程中某一个进程的启停操作:

    supervisorctl stop elkpro_2

    • 输入插件

    • 读取文件(File)

    logstash使用一个名叫FileWatch的Ruby Gem库来监听文件变化。这个库支持glob展开文件路径,而且会记录一个叫.sincedb的数据库文件来跟踪被监听的日志文件的当前读取位置。所以,不要担心logstash会漏过你的数据。

    sincedb文件中记录了每个被监听文件的inide、major number、minor number和pos。

    inode:在其他博文中已经阐述;

    major device number:可以看做是设备驱动程序,被同一设备驱动程序管理的设备有相同的major device number。这个数字实际是Kernel中device driver table的索引。这个表保存着不同的设备驱动程序;

    minor number:代表被访问的具体设备,也就是说Kernel根据major device number找到的设备驱动程序,然后再从minor device number获得设备位置属性。

     pos:包含发生错误的位置的绝对文件位置;

    • 配置示例

    input {
        file {
            path => ["/var/log/*.log", "/var/log/message"]
            type => "system"
            start_position => "beginning"
        }
    }
    • 解释

    有一些比较有用的配置项,可以用来指定File Watch库的行为:

      •   discover_interval

    logstash每隔多久去检查一次被监听的path下是否有新文件,默认15s。

      •   exclude

    不想被监听的文件可以排除出去,这里跟path一样支持glob展开。

    glob:所谓的 glob 模式是指 shell 所使用的简化了的正则表达式。星号(*)匹配零个或多个任意字符;[abc]匹配任何一个列在方括号中的字符(这个例子要么匹配一个 a,要么匹配一个 b,要么匹配一个 c);问号(?)只匹配一个任意字符;如果在方括号中使用短划线分隔两个字符,表示所有在这两个字符范围内的都可以匹配(比如 [0-9] 表示匹配所有 0 到 9 的数字)。

      •   close_older

    一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是3600s。

      •   ignore_older

    在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是86400s。

      •   sincedb_path

    如果你不想用默认的$HOME/.sincedb(windows平台上在C:windowsSystem32configsystemprofile.sincedb),可以通过这个配置定义sincedb文件到其他位置。

      •   sincedb_write_interval

    logstash每隔多久写一次sincedb文件,默认是15s。

      •   stat_interval

    logstash每隔多久检查一次被监听文件状态(是否有更新,默认是1s)

      •   start_position

    logstash从什么位置开始读取文件数据,默认是结束位置,也就是说logstash进程会以类似tail -f的形式运行。如果你是要导入原有数据,把这个设定成“beginning”,logstash进程就从头开始读取,类似less +F的形式运行。

    • 注意

    1、通常你要导入原有数据进Elasticsearch的话,你还需要filter/date插件来修改默认的@timestamp字段值。

    2、FileWatch只支持绝对路径,而且会不自动递归目录。所以有需要的话,请用数组方式都写明具体哪些文件。

    3、LogStash::Input::File只是在进程运行的注册阶段初始化一个FileWatch对象,所以它不能支持类似fluentd那样的path=> “/path/to/%{+yyyy/mm/dd/hh}.log”写法。达到相同目的,你只能写成path=>"/path/to/*/*/*/*.log"。FileWatch模块提供了一个稍微简单一点的写法:/path/to/**/*.log,用**来缩写表示递归全部子目录。

    4、在单个input/file中监听的文件数量太多的话,每次启动扫描构建监听队列会消耗较多的时间。给使用者的感觉好像读取不到一样,这是正常现象。

    5、start_position仅在该文件从未被监听过的时候起作用,如果sincedb文件中已经有这个文件的inode记录了,那么logstash依然会从记录的pos开始读取数据。

    6、因为windows平台上没有inode概念,logstash某些版本在windows平台上监听文件不是很靠谱,windows平台上,推荐考虑使用nxlog作为收集端。

    • 标准输入(Stdin)

    input {
        stdin {
            add_field => {"key" => "value"}
            codec => "plain"
            tags => ["add"]
            type => "std"
        }
    }
    • 运行结果

    用上面的新stdin设置重新运行一次最开始的hello world示例。建议大家把整段配置都写入一个文本文件,然后运行命令:bin/logstash -f stdin.conf。输入“hello world”并回车,你会在终端看到如下的输出:

    {
           "message" => "hello world",
          "@version" => "1",
        "@timestamp" => "2014-08-08T06:48:47.789Z",
              "type" => "std",
              "tags" => [
            [0] "add"
        ],
               "key" => "value",
              "host" => "raochenlindeMacBook-Air.local"
    }
    • 解释

    type和tags是logstash事件中两个特殊的字段。通常说我们会在输入区段中通过type来标记事件类型----我们肯定是提前能知道这个事件属于什么类型的。而tags这是在数据处理过程中,由具体的插件来添加或者删除的。

    最常见的用法像下面这样:

    input {
        stdin {
            type => "web"
        }
    }
    filter {
        if [type] == "web" {
            grok {
                match => ["message", %{COMBINEDAPACHELOG}]
            }
        }
    }
    output {
        if "_grokparsefailure" in [tags] {
            nagios_nsca {
                nagios_status => "1"
            }
        } else {
            elasticsearch {
            }
        }
    }
    • 读取Syslog数据

    syslog可能是运维领域最流行的数据传输协议了。当你想从设备上收集系统日志的时候,syslog应该会是你的第一选择。尤其是网络设备,比如思科----syslog几乎是唯一可行的办法。

    这里只讲如何把logstash配置成一个syslog服务器来接收数据。有关rsyslog的用法,稍后的类型项目中,会有更详细的介绍。

    • 配置示例

    input {
      syslog {
        port => "514"
      }
    }
    • 运行结果

    作为简单的测试,先暂停本机的syslog或rsyslog进程,然后启动logstash进程(这样就不会有端口冲突的问题)。现在,本机的syslog就会默认发送到logstash里了。我们可以用自带的logger命令行工具发送一条“hello world”信息到syslog里,即logstash里面。看到logstash输出像下面这样。

    {
               "message" => "Hello World",
              "@version" => "1",
            "@timestamp" => "2014-08-08T09:01:15.911Z",
                  "host" => "127.0.0.1",
              "priority" => 31,
             "timestamp" => "Aug  8 17:01:15",
             "logsource" => "raochenlindeMacBook-Air.local",
               "program" => "com.apple.metadata.mdflagwriter",
                   "pid" => "381",
              "severity" => 7,
              "facility" => 3,
        "facility_label" => "system",
        "severity_label" => "Debug"
    }
    • 解释

    Logstash是用UDPSocket,TCPServer和Logstash::Filter::Grok来实现Logstash::Inputs::Syslog的。所以你起始可以直接用logstash配置实现一样的效果:

    input {
      tcp {
        port => "8514"
      }
    }
    filter {
      grok {
        match => ["message", "%{SYSLOGLINE}" ]
      }
      syslog_pri { }
    }
    • 最佳实践

    建议在使用Logstash::Inputs::Syslog的时候走TCP协议来传输数据;

    因为具体实现中,UDP监听器只用了一个线程,而TCP监听器会在接收每个连接的时候都启动新的线程来处理后续步骤。

    如果你已经使用UDP监听收集日志,用下行命令检查你的UDP接收队列的大小:

    netstat -plnu | awk 'NR==1 || $4~/:514$/{print $2}'
    Recv-Q
    228096

    228096是UDP接收队列的默认最大大小,这时候linux内核开始丢弃数据包了。

    强 烈建议使用Logstash::Input::Tcp和Logstash::filters::Grok配合实现同样的syslog功能。

    虽然Logstash::Input::Syslog在使用TCPServer的时候可以采用多线程处理的接收,但是在同一个客户端数据处理中,其grok和date是一直在该线程中完成的,这会导致总体上的处理性能几何级的下降,经过测试,TCPServer每秒可以接收50000条数据,而在同一线程中启用grok后每秒只能处理5000条,再加上date只能到500条。

    才将这两步拆分到filters阶段后,logstash支持对该阶段插件单独设置多线程运行,大大提高了总体处理性能。在相同环境下,logstash -f tcp.conf -w 20的测试中,总体处理性能可以达到30000条数据。

    注:测试采用 logstash 作者提供的 yes "<44>May 19 18:30:17 snack jls: foo bar 32" | nc localhost 3000 命令。出处见:https://github.com/jordansissel/experiments/blob/master/ruby/jruby-netty/syslog-server/Makefile

    •  贴士

     如果实在没办法切换到TCP协议,你可以自己写程序或者使用其他基于异步IO框架(比如libev)的项目。下面是一个简单的异步IO实现UDP监听数据输入Eleasticsearch的示例:

    https://gist.github.com/chenryn/7c922ac424324ee0d695

    • 读取网络数据(TCP)

    未来你可能会用到redis服务器或者其他消息队列系统作为logstash broker的角色。不过logstash其实也有自己的TCP/UDP插件,在临时任务的时候,也算能用,尤其是测试环境。

    小贴士:

    虽然Logstash::Input::TCP用Ruby的Socket和Openssl库实现了高级的SSL功能,但Logstash本身只能在SizedQueue中缓存20个事件。这就是建议在生产环境中换用其他消息队列的原因。

    • 配置示例

    input {
        tcp {
            port => 8888
            mode => "server"
            ssl_enable => false
        }
    }
    • 常见场景

    目前看来,Logstash::Input::TCP最常见的用法就是配合nc命令导入旧数据。在启动logstash进程后,在另一个终端运行如下的命令即可导入数据:

    # nc 127.0.0.1 8888 < olddata 

    这种做法比用Logstash::Input::File好,因为当nc命令结束,我们就知道数据导入完毕了。而用input/file方式,logstash进程还会一直等待新数据输入被监听的文件,不能直接看出是否任务完成了。

    未完待续……

  • 相关阅读:
    返回图片宽高比
    3.1/3.2图片上传类
    php获取图片的拍摄及其他数据信息
    上传类
    pathinfo()的用法
    上传并压缩图片
    将数组转化为键值对
    css3判断某个li标签
    禁止滚动条/启用滚动条
    Keepalived + haproxy双机高可用方案
  • 原文地址:https://www.cnblogs.com/cf532088799/p/7732973.html
Copyright © 2011-2022 走看看