zoukankan      html  css  js  c++  java
  • logstash常用插件解析

    官方地址:https://www.elastic.co/guide/en/logstash-versioned-plugins/current/index.html

    配置文件写法:
    # 日志导入input {}
    # 日志筛选匹配处理filter {}
    # 日志匹配输出output {}
    # 日志解析配置文件的框架共分为三个模块,input,output,filter。后面会一一讲解, 每个模块里面存在不同的插件。

    input 模块

    例子1

    # file为常用文件插件,插件内选项很多,可根据需求自行判断
    input {        
    file {        
    path => "/var/lib/mysql/slow.log"        # 要导入的文件的位置,可以使用*,例如/var/log/nginx/*.log        
    Excude =>”*.gz”                                # 要排除的文件        
    start_position => "beginning"            # 从文件开始的位置开始读,end表示从结尾开始读        
    ignore_older => 0                             # 多久之内没修改过的文件不读取,0为无限制,单位为秒        
    sincedb_path => "/dev/null"              # 记录文件上次读取位置,输出到null表示每次都从文件首行开始解析        
    type => "mysql-slow"                        # type字段,可表明导入的日志类型    
    }   
    }

    例子2

    # redis插件为常用插件,插件内选项很多,可根据需求自行判断    
    input {    
    redis {        
    batch_count => 1                         # EVAL命令返回的事件数目,设置为5表示一次请求返回5条日志信息        
    data_type => "list"                       # logstash redis插件工作方式        
    key => "logstash-test-list"             # 监听的键值        
    host => "127.0.0.1"                      # redis地址        
    port => 6379                                # redis端口号        
    password => "123123"                    # 如果有安全认证,此项为认证密码        
    db => 0                                        # 如果应用使用了不同的数据库,此为redis数据库的编号,默认为0。        
    threads => 1                                # 启用线程数量      
    }
    }
    常用的 input 插件其实有很多,这里只举例了两种。其他还有 kafka,tcp 等等

    filter 模块

    例子

    filter {                                         # 插件很多,这里选取我使用过的插件做讲述    
    if ([message] =~ "正则表达式")  {  drop {}  }    # 正则匹配=~,!~,包含判断in,not in ,字符串匹配==,!=,等等,匹配之后可以做任何操作,这里过滤掉匹配行,除了做过滤操作,if后面可以作任意操作,甚至可以为匹配到的任意行做单独的正则分割操作    
    multiline {        
    pattern => "正则表达式"        
    negate => true        
    what => "previous"                               # 多行合并,由于一些日志存在一条多行的情况,这个模块可以进行指定多行合并,通过正则匹配,匹配到的内容上面的多行合并为一条日志。    
    }       
    grok {        
    match => { "message" => "正则表达式"             # 正则匹配日志,可以筛选分割出需要记录的字段和值        }           
    remove_field => ["message"]                             # 删除不需要记录的字段   
    }       
    date {        match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]        # 记录@timestamp时间,可以设置日志中自定的时间字段,如果日志中没有时间字段,也可以自己生成        
    target=>“@timestamp”                                          # 将匹配的timestamp字段放在指定的字段 默认是@timestamp    }    
    ruby {        code => "event.timestamp.time.localtime"        # timestamp时区锁定    }   
    }

    output 模块

    例子1

    output {    
    # tdout { codec => "rubydebug" }                  # 筛选过滤后的内容输出到终端显示    
    elasticsearch {                                               # 导出到es,最常用的插件        
    codec => "json"                                             # 导出格式为json        
    hosts => ["127.0.0.1:9200"]                           # ES地址+端口        
    index => "logstash-slow-%{+YYYY.MM.dd}"           # 导出到index内,可以使用时间变量        
    user => "admin"        password => "xxxxxx"           # ES如果有安全认证就使用账号密码验证,无安全认证就不需要        
    flush_size => 500                                                   # 默认500,logstash一次性攒够500条的数据在向es发送        
    idle_flush_time => 1                              # 默认1s,如果1s内没攒够500,还是会一次性把数据发给ES    }   }

    例子2

    output {     
    redis{                                                        # 输出到redis的插件,下面选项根据需求使用         
    batch => true                                           # 设为false,一次rpush,发一条数据,true为发送一批         
    batch_events => 50                                # 一次rpush发送多少数据         
    batch_timeout => 5                                 # 一次rpush消耗多少时间         
    codec => plain                                        # 对输出数据进行codec,避免使用logstash的separate filter         
    congestion_interval => 1                          # 多长时间进项一次拥塞检查         
    congestion_threshold => 5                      # 限制一个list中可以存在多少个item,当数量足够时,就会阻塞直到有其他消费者消费list中的数据         
    data_type => list                                  # 使用list还是publish         
    db => 0                                                # 使用redis的那个数据库,默认为0号         
    host => ["127.0.0.1:6379"]                   # redis 的地址和端口,会覆盖全局端口         
    key => xxx                                           # list或channel的名字         
    password => xxx                                   # redis的密码,默认不使用         
    port => 6379                                        # 全局端口,默认6379,如果host已指定,本条失效         
    reconnect_interval => 1                       # 失败重连的间隔,默认为1s         
    timeout => 5                                      # 连接超时的时间         
    workers => 1                                      # 工作进程     
    }
    }
    常用插件还有很多,更多的插件使用可以查看官方文档
    通过上面的介绍,我们大体知道了 logstash 的处理流程:
    input => filter => output
    接下来就看一完整的应用例子
    完整的应用:
    Elasticsearch slow-log

    input {    
    file {        
    path => ["/var/log/elasticsearch/private_test_index_search_slowlog.log"]        
    start_position => "beginning"        
    ignore_older => 0        
    # sincedb_path => "/dev/null"        
    type => "elasticsearch_slow"        
    }   
    }
    filter {    
    grok {        
    match =>  { "message" => "^/[(/d/d){1,2}-(?:0[1-9]|1[0-2])-(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])/s+(?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)/]/[(TRACE|DEBUG|WARN/s|INFO/s)/]/[(?[a-z/.]+)/]/s/[(?[a-z0-9/-/.]+)/]/s/[(?[A-Za-z0-9/./_/-]+)/]/[/d+/]/s+took/[(?[/./d]+(ms|s|m))/]/,/s+took_millis/[(/d)+/]/,/s+types/[(?([A-Za-z/_]+|[A-Za-z/_]*))/]/,/s+stats/[/]/,/s+search_type/[(?[A-Z/_]+)/]/,/s+total_shards/[/d+/]/,/s+source/[(?[/s/S]+)/]/,/s+extra_source/[[/s/S]*/]/,/s*$" }        
    remove_field => ["message"]        
    }       
    date {        
    match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]         
    }       
    ruby {        
    code => "event.timestamp.time.localtime"        
    }       }
    
    output {     
    elasticsearch {         
    codec => "json"         
    hosts => ["127.0.0.1:9200"]         
    index => "logstash-elasticsearch-slow-%{+YYYY.MM.dd}"         
    user => "admin"         
    password => "xxxx"    
    }   
    }

    Mysql-slow log

    input {    
    file {        
    path => "/var/lib/mysql/slow.log"        
    start_position => "beginning"        
    ignore_older => 0        
    # sincedb_path => "/dev/null"        
    type => "mysql-slow"    
    }   
    }
    filter {    
    if ([message] =~ "^(//usr//local|Tcp|Time)[/s/S]*")
    { drop {} }    
    multiline {        
    pattern => "^/#/s+Time/:/s+/d+/s+(0[1-9]|[12][0-9]|3[01]|[1-9])"        
    negate => true        
    what => "previous"    
    }       
    grok {        
    match => { "message" => "^/#/sTime/:/s+/d+/s+(?%{TIME})/n+/#/pcidata@xxx.com/:/s+[A-Za-z0-9/_]+/[(?[A-Za-z0-9/_]+)/]/pcidta@xxx.com/s+(?[A-Za-z0-9/_]+)/s+/[/]/n+/#/s+Query/_time/:/s+(?[0-9/.]+)/s+Lock/_time/:/s+(?[0-9/.]+)/s+Rows/_sent/:/s+(?/d+)/s+Rows/_examined/:/s+(?/d+)(/n+|/n+use/s+(?[A-Za-z0-9/_]+)/;/n+)SET/s+timestamp/=/d+/;/n+(?[/s/S]+)$"   }           
    remove_field => ["message"]   }       
    date {        
    match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]     
    }       
    ruby {        
    code => "event.timestamp.time.localtime"    
    }   
    }
    output {     
    elasticsearch {        
    codec => "json"        
    hosts => ["127.0.0.1:9200"]        
    index => "logstash-mysql-slow-%{+YYYY.MM.dd}"        
    user => "admin"        
    password => "xxxxx"    
    }   
    }

    Nginx access.log
    logstash 中内置 nginx 的正则,我们只要稍作修改就能使用
    将下面的内容写入到/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns/grok-patterns 文件中

    X_FOR (%{IPV4}|-)NGINXACCESS %{COMBINEDAPACHELOG} /"%{X_FOR:http_x_forwarded_for}/"ERRORDATE %{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME}NGINXERROR_ERROR %{ERRORDATE:timestamp}/s{1,}/[%{DATA:err_severity}/]/s{1,}(%{NUMBER:pid:int}#%{NUMBER}:/s{1,}/*%{NUMBER}|/*%{NUMBER}) %{DATA:err_message}(?:,/s{1,}client:/s{1,}(?%{IP}|%{HOSTNAME}))(?:,/s{1,}server:/s{1,}%{IPORHOST:server})(?:, request: %{QS:request})?(?:, host: %{QS:server_ip})?(?:, referrer:/"%{URI:referrer})?NGINXERROR_OTHER %{ERRORDATE:timestamp}/s{1,}/[%{DATA:err_severity}/]/s{1,}%{GREEDYDATA:err_message}

    之后的 log 配置文件如下

    input {    
    file {    
    path => [ "/var/log/nginx/www-access.log" ]    
    start_position => "beginning"    
    # sincedb_path => "/dev/null"    
    type => "nginx_access"    
    }   
    }
    filter {    
    grok {         
    match => { "message" => "%{NGINXACCESS}"}    
    }    
    mutate {        
    convert => [ "response","integer" ]        
    convert => [ "bytes","integer" ]    
    }    
    date {        
    match => [ "timestamp","dd/MMM/yyyy:HH:mm:ss Z"]    
    }       
    ruby {        
    code => "event.timestamp.time.localtime"    
    }   
    }
    output {    
    elasticsearch {        
    codec => "json"        
    hosts => ["127.0.0.1:9200"]        
    index => "logstash-nginx-access-%{+YYYY.MM.dd}"        
    user => "admin"        
    password => "xxxx"    
    }
    } 

    Nginx error.log

    input {    
    file {    
    path => [ "/var/log/nginx/www-error.log" ]    
    start_position => "beginning"    
    # sincedb_path => "/dev/null"    
    type => "nginx_error"    
    }
    }
    filter {    
    grok {        
    match => [  
    "message","%{NGINXERROR_ERROR}",                   
    "message","%{NGINXERROR_OTHER}"                 
    ]    
    }       
    ruby {        
    code => "event.timestamp.time.localtime"    
    }        
    date {         
    match => [ "timestamp","dd/MMM/yyyy:HH:mm:ss"]     
    }
    }
    output {    
    elasticsearch {        
    codec => "json"        
    hosts => ["127.0.0.1:9200"]        
    index => "logstash-nginx-error-%{+YYYY.MM.dd}"        
    user => "admin"        
    password => "xxxx"    
    }   
    } 

    PHP error.log

    input {    
    file {        
    path => ["/var/log/php/error.log"]        
    start_position => "beginning"        
    # sincedb_path => "/dev/null"        
    type => "php-fpm_error"    
    }   
    }
    filter {    
    multiline {        pattern => "^/[(0[1-9]|[12][0-9]|3[01]|[1-9])/-%{MONTH}-%{YEAR}[/s/S]+"        negate => true        what => "previous"    }       
    grok {        match => { "message" => "^/[(?(0[1-9]|[12][0-9]|3[01]|[1-9])/-%{MONTH}-%{YEAR}/s+%{TIME}?)/s+[A-Za-z]+//[A-Za-z]+/]/s+(?(?:[A-Z]{3}/s+[A-Z]{1}[a-z]{5,7}|[A-Z]{3}/s+[A-Z]{1}[a-z/s]{9,11}))/:/s+(?[/s/S]+$)" }        
    remove_field => ["message"]    }       
    date {        match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]     }       
    ruby {        code => "event.timestamp.time.localtime"    }   }
    output {    
    elasticsearch {        
    codec => "json"        
    hosts => ["127.0.0.1:9200"]        
    index => "logstash-php-error-%{+YYYY.MM.dd}"        
    user => "admin"        
    password => "xxxxx"    }   
    }

    Php-fpm slow-log

    input {    
    file {        
    path => ["/var/log/php-fpm/www.slow.log"]        
    start_position => "beginning"        
    # sincedb_path => "/dev/null"        
    type => "php-fpm_slow"    
    }   
    }
    filter {    
    multiline {        pattern => "^$"        negate => true        what => "previous"    }       
    grok {        match => { "message" => "^/[(?(0[1-9]|[12][0-9]|3[01]|[1-9])/-%{MONTH}-%{YEAR}/s+%{TIME})/]/s+/[[a-z]{4}/s+(?[A-Za-z0-9]{1,8})/]/s+[a-z]{3}/s+(?/d{1,7})/n(?[/s/S]+$)" }        
    remove_field => ["message"]    }       
    date {        match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]     }       
    ruby {        code => "event.timestamp.time.localtime"    }   }
    output {    
    elasticsearch {        
    codec => "json"        
    hosts => ["127.0.0.1:9200"]        
    index => "logstash-php-fpm-slow-%{+YYYY.MM.dd}"        
    user => "admin"        
    password => "xxxx"    }   
    }

    log 解析配置文件统一放在/etc/logstash/conf.d 目录下,不过也可以任意放置,统一起来最好。
    在多个配置文件的时候,不能使用如下命令运行logstash:
    /opt/logstash/bin/logstash -f /etc/logstash/conf.d/(或者有个*)
    这个命令会拼接配置文件,不会单个使用,会报错。
    如果有多个配置文件,就一个一个启动:
    /opt/logstash/bin/logstash -f /etc/logstash/conf.d/nginx_error.conf &
    但是这样也很麻烦,如果配置文件很多的情况下需要一个个来,并且启动
    速度还很慢,写了一个测试脚本用来方便使用,仅供参考:

    #!/bin/bash
    # /配置文件存放目录根据需求自己更改
    conf_path=/etc/logstash/conf.d
    conf_name=$( ls ${conf_path} )
    case $1 in
    start)    
    echo "-----------please wait.----------"    
    echo "The start-up process is too slow."    
    for cf in ${conf_name}    
    do          
    /opt/logstash/bin/logstash  -f $conf_path/$cf > /dev/null 2>&;1 &;            
    if [ $? -ne 0 ];then                
    echo 'The '${cf}' start-up failed.'            
    fi        
    sleep 20    
    done    
    echo "start-up success."
    ;;    
    stop)    
    ps -ef |grep logstash |grep -v grep > /dev/null 2>&;1     
    if [ $? -eq 0 ];then        
    ps -ef|grep logstash |grep -v grep |awk '{print $2}'|xargs  kill -9 > /dev/null 2>&;1        
    sleep 2        
    echo "Stop success."    
    fi  
    ;;
    restart)    
    ps -ef |grep logstash |grep -v grep 2>&;1    
    if [ $? -eq 0 ];then        
    ps -ef|grep logstash |grep -v grep |awk '{print $2}'|xargs  kill -9 > /dev/null 2>&;1        
    sleep 3        
    echo "Stop success."    
    fi      
    echo "-----------please wait.----------"    
    echo "The start-up process is too slow."    
    for cf in ${conf_name}    
    do          
    /opt/logstash/bin/logstash  -f $conf_path/$cf > /dev/null 2>&;1 &;            
    if [ $? -ne 0 ];then                
    echo 'The '${cf}' start-up failed.'            
    fi        
    sleep 10    
    done     
    echo "start-up success."
    ;;
    *)    
    echo "Usage: "$0" {start|stop|restart|}"    
    exit 1;
    ;;
    esac 
    

     脚本的名字中不要包含 logstash,这里保存为 log_stash.sh,使用./log_stash.sh (start|stop|restart) 来执行脚本。

  • 相关阅读:
    第三个失踪人员,查找在日本王军的朋友
    web.xmlf多ilter在执行顺序
    HDU 1885 Key Task 国家压缩+搜索
    POJ--2923--Relocation--如压力DP
    唯物论、辩证法和认识论
    唯物辩证法的“三大规律”和“五大范畴”-联系与发展
    分析法
    方法论
    哲学的基本问题是什么
    事物分析是一切问题解决的基础和起点
  • 原文地址:https://www.cnblogs.com/Dev0ps/p/9424891.html
Copyright © 2011-2022 走看看