zoukankan      html  css  js  c++  java
  • ELK集群之logstash(5)

    Logstash工作原理

     

    Logstash事件处理有三个阶段:inputs → filters → outputs。是一个接收,处理,转发日志的工具。支持系统日志,webserver日志,错误日志,应用日志,总之包括所有可以抛出来的日志类型。

    如上图,Logstash的数据处理过程主要包括:Inputs,Filters,Outputs 三部分,另外在Inputs和Outputs中可以使用Codecs对数据格式进行处理。这四个部分均以插件形式存在,用户通过定义pipeline配置文件,设置需要使用的input,filter,output,codec插件,以实现特定的数据采集,数据处理,数据输出等功能 。

    Inputs:用于从数据源获取数据,常见的插件如file, syslog, redis, beats 等
    Filters:用于处理数据如格式转换,数据派生等,常见的插件如grok, mutate, drop, clone, geoip等
    Outputs:用于数据输出,常见的插件如elastcisearch,file, graphite, statsd等
    Codecs:Codecs(编码插件)不是一个单独的流程,而是在输入和输出等插件中用于数据转换的模块,用于对数据进行编码处理,常见的插件如json,multiline。Logstash不只是一个input | filter | output 的数据流,而是一个 input | decode | filter | encode | output 的数据流!codec 就是用来 decode、encode 事件的。

    Input:输入数据到logstash

    一些常用的输入为:

    file:从文件系统的文件中读取,类似于tial -f命令

    syslog:在514端口上监听系统日志消息,并根据RFC3164标准进行解析

    redis:从redis service中读取

    beats:从filebeat中读取

    Filters:数据中间处理,对数据进行操作。

    一些常用的过滤器为:

    grok:解析任意文本数据,Grok 是 Logstash 最重要的插件。它的主要作用就是将文本格式的字符串,转换成为具体的结构化的数据,配合正则表达式使用。内置120多个解析语法。

    官方提供的grok表达式:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
    grok在线调试:https://grokdebug.herokuapp.com/

    mutate:对字段进行转换。例如对字段进行删除、替换、修改、重命名等。

    drop:丢弃一部分events不进行处理。

    clone:拷贝 event,这个过程中也可以添加或移除字段。

    geoip:添加地理信息(为前台kibana图形化展示使用)

    Outputsoutputslogstash处理管道的最末端组件。一个event可以在处理过程中经过多重输出,但是一旦所有的outputs都执行结束,这个event也就完成生命周期。

    一些常见的outputs为:

    elasticsearch:可以高效的保存数据,并且能够方便和简单的进行查询。

    file:将event数据保存到文件中。

    graphite:将event数据发送到图形化组件中,一个很流行的开源存储图形化展示的组件。

    Codecscodecs 是基于数据流的过滤器,它可以作为inputoutput的一部分配置。Codecs可以帮助你轻松的分割发送过来已经被序列化的数据。

    一些常见的codecs:

    json:使用json格式对数据进行编码/解码。

    multiline:将汇多个事件中数据汇总为一个单一的行。比如:java异常信息和堆栈信息。

    logstash安装

    Logstash功能
      Logstash主要对日志进行过滤处理,也能用来做日志收集。但日志采集一般不用logstash
      输入支持: 标准输入、文本日志输入等
      输出支持: 标准输出、输出到es等
    Logstash的安装
      yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel -y
      yum localinstall logstash-7.6.2.rpm
    
    Logstash的JVM配置文件更新jvm.options 
      -Xms200M
      -Xmx200M
    
    Logstash最简单配置/etc/logstash/conf.d/logstash.conf
    input{
      stdin{}
    }
    output{
      stdout{
        codec=>rubydebug
      }
    }
    
    Logstash的启动和测试
      /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf
      输入字符,查看输出
    安装
    Logstash读日志文件实战
    安装nginx来提供日志输入
      yum install nginx -y
    
    编辑nginx的systemctl配置文件/usr/lib/systemd/system/nginx.service,删除底下
    KillSignal=SIGQUIT
    TimeoutStopSec=5
    KillMode=process
    PrivateTmp=true
    
    Logstash收集日志注意点
      默认logstash用logstash用户启动,日志需要给logstash用户读权限
      需要有新日志产生,刚启动老的日志默认不读取
    
    输入读取nginx日志
    input {
      file {
        path => "/var/log/nginx/access.log"
      }
    }
    output{
      stdout{
        codec=>rubydebug
      }
    }
    
    Logstash的启动
      systemctl enable logstash
      systemctl restart logstash
      观察日志:/var/log/messages
    logstash读取nginx日志
    input {
      file {
        path => "/var/log/nginx/access.log"
      }
    }
    output {
      elasticsearch {
        hosts => ["http://xxx:9200", "http://xxx:9200"]
        user => "elastic"
        password => "sjgpwd"
        index => "sjgnginx-%{+YYYY.MM.dd}"
      }
    }
    
    Logstash配置重载,重启Logstash慢
    kill -1 pid
    logstash将日志输出到es
    模拟Nginx日志产生
    while true;do 
      curl 127.0.0.1/sjgsjg
      curl 192.168.238.90/sjg666
      sleep 5
    done
    
            Logstash正则提取Nginx日志
    为什么需要提取?使用一整行日志无法分析,需要提取单独的字段
      分析哪个IP访问量大
      分析Nginx的响应状态码
    
    Nginx日志格式
      192.168.238.90 - - [01/Aug/2020:14:53:35 +0800] "GET /sjg666 HTTP/1.1" 404 3650 "-" "Chrome xxx" "-"
    
    Nginx日志格式配置
      log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                          '$status $body_bytes_sent "$http_referer" '
                          '"$http_user_agent" "$http_x_forwarded_for"';
    $remote_addr 访问请求地址
    $remote_user nginx登录的用户
    $time_local 访问时间
    $request 请求的动作 目录 协议
    $status 返回状态
    $body_bytes_sent 当前请求的字节数
    $http_referer 页面的head信息从哪个页面发送的请求
    $http_user_agent 使用的代理软件 
    $http_x_forwarded_for http真实请求头
    nginx日志格式详解

    filter glok正则

    
    

    Nginx日志格式
    192.168.238.90 - - [01/Aug/2020:14:53:35 +0800] "GET /sjg666 HTTP/1.1" 404 3650 "-" "Chrome xxx" "-"


    Grok提取利器,需要掌握正则表达式。借助Kibana的Grok工具验证提取 自写正则提取(建议) 内置规则提取(简化):
    /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns Grok自写正则提取语法:(?<字段名>自写正则表达式) (?<remote_addr>d+.d+.d+.d+) 内置正则提取语法:%{内置正则表达式:字段名} %{IP:remote_addr} - (%{WORD:remote_user}|-) [%{HTTPDATE:time_local}] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent} 混合语法提取 (?<remote_addr>d+.d+.d+.d+) - (%{WORD:remote_user}|-) [%{HTTPDATE:time_local}] 普通正则表达式符号 . 表示任意一个字符,* 表示前面一个字符出现0次或者多次 [abc]表示中括号内任意一个字符,[^abc]表示非中括号内的字符 [0-9]表示数字,[a-z]表示小写字母,[A-Z]表示大写字母,[a-zA-Z]表示所有字母,[a-zA-Z0-9]表示所有字母+数字 [^0-9]表示非数字 ^xx表示以xx开头,xx$表示以xx结尾 s表示空白字符,S表示非空白字符,d表示数字 扩展正则表达式,在普通正则基础上再进行扩展 ?表示前面字符出现0或者1次,+前面字符出现1或者多次 {a}表示前面字符匹配a次,{a,b}表示前面字符匹配a到b次 {,b}表示前面字符匹配0次到b次,{a,}前面字符匹配a或a+次 string1|string2表示匹配string1或者string2
    USERNAME [a-zA-Z0-9._-]+
    USER %{USERNAME}
    EMAILLOCALPART [a-zA-Z][a-zA-Z0-9_.+-=:]+
    EMAILADDRESS %{EMAILLOCALPART}@%{HOSTNAME}
    INT (?:[+-]?(?:[0-9]+))
    BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:.[0-9]+)?)|(?:.[0-9]+)))
    NUMBER (?:%{BASE10NUM})
    BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))
    BASE16FLOAT (?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:.[0-9A-Fa-f]*)?)|(?:.[0-9A-Fa-f]+)))
    
    POSINT (?:[1-9][0-9]*)
    NONNEGINT (?:[0-9]+)
    WORD w+
    NOTSPACE S+
    SPACE s*
    DATA .*?
    GREEDYDATA .*
    QUOTEDSTRING (?>(?<!\)(?>"(?>\.|[^\"]+)+"|""|(?>'(?>\.|[^\']+)+')|''|(?>`(?>\.|[^\`]+)+`)|``))
    UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}
    # URN, allowing use of RFC 2141 section 2.3 reserved characters
    URN urn:[0-9A-Za-z][0-9A-Za-z-]{0,31}:(?:%[0-9a-fA-F]{2}|[0-9A-Za-z()+,.:=@;$_!*'/?#-])+
    
    # Networking
    MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
    CISCOMAC (?:(?:[A-Fa-f0-9]{4}.){2}[A-Fa-f0-9]{4})
    WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
    COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
    IPV4 (?<![0-9])(?:(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5]))(?![0-9])
    IP (?:%{IPV6}|%{IPV4})
    HOSTNAME (?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(.?|)
    IPORHOST (?:%{IP}|%{HOSTNAME})
    HOSTPORT %{IPORHOST}:%{POSINT}
    
    # paths
    PATH (?:%{UNIXPATH}|%{WINPATH})
    UNIXPATH (/([w_%!$@:.,+~-]+|\.)*)+
    TTY (?:/dev/(pts|tty([pq])?)(w+)?/?(?:[0-9]+))
    WINPATH (?>[A-Za-z]+:|\)(?:\[^\?*]*)+
    URIPROTO [A-Za-z]([A-Za-z0-9+-.]+)+
    URIHOST %{IPORHOST}(?::%{POSINT:port})?
    # uripath comes loosely from RFC1738, but mostly from what Firefox
    # doesn't turn into %XX
    URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%&_-]*)+
    #URIPARAM ?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
    URIPARAM ?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?-[]<>]*
    URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
    URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?
    
    # Months: January, Feb, 3, 03, 12, December
    MONTH (?:[Jj]an(?:uary|uar)?|[Ff]eb(?:ruary|ruar)?|[Mm](?:a|ä)?r(?:ch|z)?|[Aa]pr(?:il)?|[Mm]a(?:y|i)?|[Jj]un(?:e|i)?|[Jj]ul(?:y)?|[Aa]ug(?:ust)?|[Ss]ep(?:tember)?|[Oo](?:c|k)?t(?:ober)?|[Nn]ov(?:ember)?|[Dd]e(?:c|z)(?:ember)?)
    MONTHNUM (?:0?[1-9]|1[0-2])
    MONTHNUM2 (?:0[1-9]|1[0-2])
    MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])
    
    # Days: Monday, Tue, Thu, etc...
    DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)
    
    # Years?
    YEAR (?>dd){1,2}
    HOUR (?:2[0123]|[01]?[0-9])
    MINUTE (?:[0-5][0-9])
    # '60' is a leap second in most time standards and thus is valid.
    SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
    TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
    # datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
    DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
    DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
    ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
    ISO8601_SECOND (?:%{SECOND}|60)
    TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
    DATE %{DATE_US}|%{DATE_EU}
    DATESTAMP %{DATE}[- ]%{TIME}
    TZ (?:[APMCE][SD]T|UTC)
    DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
    DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}
    DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}
    DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}
    
    # Syslog Dates: Month Day HH:MM:SS
    SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
    PROG [x21-x5ax5cx5e-x7e]+
    SYSLOGPROG %{PROG:program}(?:[%{POSINT:pid}])?
    SYSLOGHOST %{IPORHOST}
    SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
    HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}
    
    # Shortcuts
    QS %{QUOTEDSTRING}
    
    # Log formats
    SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
    
    # Log Levels
    LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)
    grok内置正则
    input {
      file {
        path => "/var/log/nginx/access.log"
      }
    }
    filter {
      grok {
        match => {
          "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) [%{HTTPDATE:time_local}] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
        }
        remove_field => ["message"]
      }
    }
    output {
      elasticsearch {
        hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
        user => "elastic"
        password => "sjgpwd"
        index => "sjgnginx-%{+YYYY.MM.dd}"
      }
    }
    正则提取nginx日志
    Kibana显示感叹号问题处理
      Kibana索引刷新
      Kibana索引的操作并不会影响到数据,删除重建也没问题
    
            Logstash字段特殊处理-替换或转类型
    http_user_agent包含双引号,需要去除
    filter {
      grok {
        match => {
          "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) [%{HTTPDATE:time_local}] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
        }
        remove_field => ["message"]
      }
      mutate {
        gsub => [ "http_user_agent",'"',"" ]
      }
    }
    
    Logstash字符串转整形
      mutate{
        gsub => [ "http_user_agent",'"',"" ]
        convert => { "status" => "integer" }
        convert => { "body_bytes_sent" => "integer" }
      }
    字段替换或转类型处理
    Logstash替换时间戳@timestamp
    Nginx模拟用户访问
    while true;do 
      curl 192.168.238.90/sjg666
      curl 127.0.0.1
      sleep 2
    done
    
    场景假设
      假设我们要分析用户昨天的访问日志
    
    Logstash分析所有Nginx日志,发现问题
    input {
      file {
        path => "/var/log/nginx/access.log"
        start_position => "beginning"
        sincedb_path => "/dev/null"
      }
    }
    
    两种时间
      发送日志时间,无法分析日志
      用户的访问时间在日志里,需要以日志里的为准,分析的结果才准确
    
    以用户访问时间为准,格式为01/Aug/2020:10:34:20 +0800
    filter {
      grok {
        match => {
          "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) [%{HTTPDATE:time_local}] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
        }
        remove_field => ["message"]
      }
      date {
        match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"]
        target => "@timestamp"
      }
    }
    
    日志里如果有不同的时间格式,覆盖的时候格式要对应
    20/Feb/2019:14:50:06 -> dd/MMM/yyyy:HH:mm:ss
    2016-08-24 18:05:39,830 -> yyyy-MM-dd HH:mm:ss,SSS
    
    手动统计Nginx的请求和网页显示进行对比
    cat /var/log/nginx/access.log |awk '{print $4}'|sed 's/:[0-9][0-9]$//g'|sort |uniq -c
    
    时间戳覆盖后删除
      mutate {
        gsub => [ "http_user_agent",'"',"" ]
        convert => { "status" => "integer" }
        convert => { "body_bytes_sent" => "integer" }
        remove_field => ["time_local"]
      }
    替换时间戳
            Logstash正则提取异常处理
    Logstash改成分析最新日志
    input {
      file {
        path => "/var/log/nginx/access.log"
      }
    }
    
    正则提取有异常的情况
    echo "sjgmethods xxx xxx" >> /var/log/nginx/access.log
    tags: _grokparsefailure
    
    设置正则出错提取到另外的索引里
    output {
      if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] {
        elasticsearch {
          hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
          user => "elastic"
          password => "sjgpwd"
          index => "sjgnginx-%{+YYYY.MM.dd}"
        }
      }
      else{
        elasticsearch {
          hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
          user => "elastic"
          password => "sjgpwd"
          index => "sjgfail-%{+YYYY.MM.dd}"
        }
      }
    }
    
            Kibana图形简单使用
    模拟数据
      while true;do 
        curl 192.168.238.90/sjg666; 
        curl 127.0.0.1; 
        sleep 2; 
      done
    
    首页区域
      可以根据时间查看访问量:每分钟访问量
      可以根据某个字段查询
      可以单独看某个字段的统计
    
    Kibana图形有建立,选择terms去查看对应的数据
      饼图的创建 pie_remote_addr
      表的创建 table_remote_addr
    
    Kibana面板的创建sjg_dash
      创建面板
      在面板上添加图形
    
    建议采用Grafana展示
    正则替换异常处理
    Logstash分析Linux系统日志
    默认的日志格式
      Aug  3 18:37:57 sjg1 sshd[1318]: Accepted password for root from xxx port 49205 ssh2
      无年份的字段
    
    系统日志配置/etc/rsyslog.conf,重启rsyslog
    $template sjgformat,"%$NOW% %TIMESTAMP:8:15% %hostname% %syslogtag% %msg%
    "
    $ActionFileDefaultTemplate sjgformat
    
    日志格式
    2020-08-03 18:47:34 sjg1 sshd[1522]: Accepted password for root from 58.101.14.103 port 49774 ssh2
    %{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)
    
    只读权限添加
    chmod +r secure
    
    提取secure日志,messages等其它日志提取原理类似
    input {
      file {
        path => "/var/log/secure"
      }
    }
    filter {
      grok {
        match => {
          "message" => '%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)'
        }
        remove_field => ["message"]
      }
      date {
        match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
        target => "@timestamp"
      }
      mutate {
        remove_field => ["timestamp"]
      }
    }
    output {
      elasticsearch {
        hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
        user => "elastic"
        password => "sjgpwd"
        index => "sjgsecure-%{+YYYY.MM.dd}"
      }
    }
    提取ssh日志分析
    #input{
    #    file {
    #    path => "/var/log/nginx/access.log"
    #    type => "nginx"
    #  }
    #    file {
    #    path => "/var/log/secure"
    #    type => "system"
    #input{
    #    file {
    #    path => "/var/log/nginx/access.log"
    #    type => "nginx"
    #  }
    #    file {
    #    path => "/var/log/secure"
    #    type => "system"
    #  }
    #
    #}
    input {
      beats {
        host => '0.0.0.0'
        port => 5044
      }
    }
    filter {
      if [type] == "nginx" {
        grok {
          match => {
            "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) [%{HTTPDATE:time_local}] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
          }
          remove_field => ["message"]
        }
        date {
          match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"]
          target => "@timestamp"
        }
        mutate {
          gsub => [ "http_user_agent",'"','' ]
          convert => { "status" => "integer" }
          convert => { "body_bytes_sent" => "integer" }
          remove_field => ["time_local"]
        }
      }
       if [type] == "system"{
         grok {
          match => {
            "message" => '%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)'
          }
          remove_field => ["message"]
        }
        date {
          match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
          target => "@timestamp"
        }
        mutate {
          remove_field => ["timestamp"]
        }
      }
    }
    output {
      if [type] == "nginx"{
        elasticsearch {
          hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
          user => "elastic"
          password => "1.Q1.Q1.Q"
          index => "testnginx-%{+YYYY.MM.dd}"
        }
      }
      if [type] == "system"{
        elasticsearch {
          hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
          user => "elastic"
          password => "1.Q1.Q1.Q"
          index => "testsystem-%{+YYYY.MM.dd}"
        }
      }
    }
    logstash提取多个日志
    input {
      beats {
        host => '0.0.0.0'
        port => 5044
      }
    }
    filter {
      if [type] == "nginx" {
        grok {
          match => {
            "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) [%{HTTPDATE:time_local}] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
          }
          remove_field => ["message"]
        }
        date {
          match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"]
          target => "@timestamp"
        }
        mutate {
          gsub => [ "http_user_agent",'"','' ]
          convert => { "status" => "integer" }
          convert => { "body_bytes_sent" => "integer" }
          remove_field => ["time_local"]
        }
      }
       if [type] == "system"{
         grok {
          match => {
            "message" => '%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)'
          }
          remove_field => ["message"]
        }
        date {
          match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
          target => "@timestamp"
        }
        mutate {
          remove_field => ["timestamp"]
        }
      }
       if [type] == "mysql"{
         grok {
          match => {
            }
          remove_field => ["message"]
    
         }
         date {
           match => ["timestamp", "yyMMdd HH:mm:ss"]
           target => "@timestamp"
         }
         mutate {
           remove_field => ["timestamp"]
         }
    
       }
    
       if [type] == "tomcat" {
        grok {
          match => {
            "message" => '(?<timestamp>d+-[a-zA-Z]+-d+ d+:d+:d+.d+) %{NOTSPACE:loglevel} %{NOTSPACE:thread}(?<loginfo>(s+.*)+)'
          }
          remove_field => ["message"]
        }
        date {
          match => ["time_local", "dd-MMM-yyyy HH:mm:ss.SSS"]
          target => "@timestamp"
        }
        mutate {
          remove_field => ["timestamp"]
        }
      }
    
    }
    output {
      if [type] == "mysql"{
        elasticsearch {
          hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
          user => "elastic"
          password => "1.Q1.Q1.Q"
          index => "testmysql-%{+YYYY.MM.dd}"
        }
      }
      if [type] == "tomcat"{
        elasticsearch {
          hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
          user => "elastic"
          password => "1.Q1.Q1.Q"
          index => "testtomcat-%{+YYYY.MM.dd}"
        }
      }
      if [type] == "nginx"{
        elasticsearch {
          hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
          user => "elastic"
          password => "1.Q1.Q1.Q"
          index => "testnginx-%{+YYYY.MM.dd}"
        }
      }
      if [type] == "system"{
        elasticsearch {
          hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
          user => "elastic"
          password => "1.Q1.Q1.Q"
          index => "testsystem-%{+YYYY.MM.dd}"
        }
      }
    }
    logstash接收filebeat多主机日志
    input {
      kafka {
        bootstrap_servers => "172.17.166.217:9092,172.17.166.218:9092,172.17.166.219:9092"
        topics => ["sjg"]
        group_id => "sjggroup"
        codec => "json"
      }
    }
    filter {
      if [type] == "nginx" {
        grok {
          match => {
            "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) [%{HTTPDATE:time_local}] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
          }
          remove_field => ["message"]
        }
        date {
          match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"]
          target => "@timestamp"
        }
        mutate {
          gsub => [ "http_user_agent",'"','' ]
          convert => { "status" => "integer" }
          convert => { "body_bytes_sent" => "integer" }
          remove_field => ["time_local"]
        }
      }
       if [type] == "system"{
         grok {
          match => {
            "message" => '%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)'
          }
          remove_field => ["message"]
        }
        date {
          match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
          target => "@timestamp"
        }
        mutate {
          remove_field => ["timestamp"]
        }
      }
       if [type] == "mysql"{
         grok {
          match => {
            }
          remove_field => ["message"]
    
         }
         date {
           match => ["timestamp", "yyMMdd HH:mm:ss"]
           target => "@timestamp"
         }
         mutate {
           remove_field => ["timestamp"]
         }
    
       }
    
       if [type] == "tomcat" {
        grok {
          match => {
            "message" => '(?<timestamp>d+-[a-zA-Z]+-d+ d+:d+:d+.d+) %{NOTSPACE:loglevel} %{NOTSPACE:thread}(?<loginfo>(s+.*)+)'
          }
          remove_field => ["message"]
        }
        date {
          match => ["time_local", "dd-MMM-yyyy HH:mm:ss.SSS"]
          target => "@timestamp"
        }
        mutate {
          remove_field => ["timestamp"]
        }
      }
    
    }
    output {
      if [type] == "mysql"{
        elasticsearch {
          hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
          user => "elastic"
          password => "1.Q1.Q1.Q"
          index => "testmysql-%{+YYYY.MM.dd}"
        }
      }
      if [type] == "tomcat"{
        elasticsearch {
          hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
          user => "elastic"
          password => "1.Q1.Q1.Q"
          index => "testtomcat-%{+YYYY.MM.dd}"
        }
      }
      if [type] == "nginx"{
        elasticsearch {
          hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
          user => "elastic"
          password => "1.Q1.Q1.Q"
          index => "testnginx-%{+YYYY.MM.dd}"
        }
      }
      if [type] == "system"{
        elasticsearch {
          hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
          user => "elastic"
          password => "1.Q1.Q1.Q"
          index => "testsystem-%{+YYYY.MM.dd}"
        }
      }
    }
    logstash提取kafka队列日志信息
    input {
      kafka {
        bootstrap_servers => "172.17.166.217:9092,172.17.166.218:9092,172.17.166.219:9092"
        topics => ["test2"]
        group_id => "test2group"
        codec => "json"
      }
    }
    filter {
      mutate {
        remove_field => ["agent","ecs","log","input","[host][os]"]
      }
    }
    #output {
    #  elasticsearch {
    #    hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200", "http://172.17.166.219:9200"]
    #    user => "elastic"
    #    password => "1.Q1.Q1.Q"
    #    index => "testmetric-%{+YYYY.MM.dd}"
    #  }
    #}
    output {
      if [service][type] == "mysql" {
        elasticsearch {
          hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
          user => "elastic"
          password => "sjgpwd"
          index => "sjgmysql-%{+YYYY.MM.dd}"
        }
      }
      else if [service][type] == "redis" {
        elasticsearch {
          hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
          user => "elastic"
          password => "sjgpwd"
          index => "sjgredis-%{+YYYY.MM.dd}"
        }
      }
      else {
        elasticsearch {
          hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
          user => "elastic"
          password => "sjgpwd"
          index => "sjgother-%{+YYYY.MM.dd}"
        }
      }
    }
    logstash提取metricbeat日志发送到不同es分类索引
  • 相关阅读:
    洛谷 P4071 [SDOI2016]排列计数
    问题 G: 【一本通提高同余问题】计算器
    问题 A: 【一本通提高组合数学】Bullcow 牡牛和牝牛
    浅谈卢卡斯定理(非扩展)
    2019西安联训B层 Day 6练习题 问题 C: 扩展欧几里得
    react使用lazy()和Suspense实现根据路由进行代码分割
    react-loadable 使用高阶组件动态import组件,实现代码分割(code-splitting)
    react angular vue流行度对比
    react 服务端渲染(ssr) 框架 Next.js
    超级字符串内class正则匹配替换 可以用于css modules
  • 原文地址:https://www.cnblogs.com/dahuige/p/15067379.html
Copyright © 2011-2022 走看看