zoukankan      html  css  js  c++  java
  • ELK日志平台搭建

    功能:

    1、  查看当天的服务器日志信息(要求:在出现警告甚至警告级别以上的都要查询)
    2、 能够查看服务器的所有用户的操作日志
    3、 能够查询nginx服务器采集的日志(kibana作图)
    4、 查看tomcat+log4j运行日志
    5、 Mysql的慢查询日志

    3.1:采集服务器日志

    Rsyslog是CentOS6.X自带的一款系统日志工具:

    1.支持多线程
    2.支持TCP,SSL,TLS,RELP等协议
    3.支持将日志写入MySQL, PGSQL, Oracle等多种关系型数据中
    4.拥有强大的过滤器,可实现过滤系统信息中的任意部分
    5.可以自定义日志输出格式

    对于ELK stack来说,我们需要实时的知道当前系统运行的情况,如果当前系统出现了问题,能够及时发现,以免影响线上实例

    Rsyslog配置文件介绍:/etc/rsyslog.conf文件:

    *.info;mail.none;authpriv.none;cron.none/var/log/messages.各类型日志存放位置
    cron.* /var/log/cron 具体日志存放的位置
    authpriv.* /var/log/secure 认证授权认证
    mail.* -/var/log/maillog 邮件日志
    cron.* /var/log/cron 任务计划相关日志
    kern   内核相关日志
    lpr   打印
    mark(syslog)   rsyslog服务内部的信息,时间标识
    news   新闻组
    user   用户程序产生的相关信息
    uucp   协议
    local 0~7   用户自定义日志级别

    日志级别:

    rsyslog共有7种日志级别,数字代号从 0~7。具体的意义如下所示:

    0 debug       –有调式信息的,日志信息最多

    1 info         一般信息的日志,最常用

    2 notice       –最具有重要性的普通条件的信息

    3 warning     –警告级别

    4 err         –错误级别,阻止某个功能或者模块不能正常工作的信息

    5 crit         –严重级别,阻止整个系统或者整个软件不能正常工作的信息

    6 alert       –需要立刻修改的信息

    7 emerg     –内核崩溃等严重信息

    本项目中,将日志界别调整成3 warning:

    local3.*                                                /var/log/boot.log
    *.warning                                               /var/log/warning_Log

    然后将日志信息发送至6789端口:

    *.* @@hadoop01:6789

    这样系统在生成日志,同时也会将日志发送到6789端口

    重启日志:/etc/init.d/rsyslog restart

    编写logstash:Vim rsyslog.conf

    input {
      tcp {
      port => "6789"   #监控6789端口
      type => "rsyslog"   #日志类型是rsyslog
      }
    }
    filter {
     if [type] == "rsyslog" {   # 做一次判断,只要从6789端口过来的rsyslog日志
      grok { # 通过正则表达式,取出想要的字段
        match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
        add_field => [ "received_at", "%{@timestamp}" ]
        add_field => [ "received_from", "%{host}" ]
      }
      date {
        match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] #将系统的日志格式化成标准国际化时间
      }
    }
    }
    output{   #将日志打入elasticsearch
       if [type] == "rsyslog"{
          stdout{codec=>rubydebug}
          elasticsearch {
          action => "index"
          hosts  => "hadoop01:9200"
          index  => "logstash-%{type}-%{+yyyy.MM.dd}"
      }
    }
    }

    启动logstash:

    bin/logstash -f /usr/local/elk/logstash-5.5.2/conf/template/rsyslog.conf

    通过telnet hadoop01 6789传输数据:

    【注意,在logstash中的grok是正则表达式,用来解析当前数据】

    Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154]
    Jun 05 08:00:00 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' denied
    Jun 05 08:10:00 louis CRON[620]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)
    Jun 05 08:05:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'.

    然后去ip:9100查看日志是否生成:

    3.2:采集服务器的所有用户的操作日志

    用户在命令行环境下的操作日志都会被系统记录下来;比如我们输入history命令,都会展示出每一个用户输入过的命令;

    .bash_history文件,这个日志格式可以定义成我们需要显示的内容,方便我们排查或者做入侵检查的时候使用;

    自定义日志格式:

    HISTFILESIZE=4000     #保存命令的记录总数
    HISTSIZE=4000         # history 命令输出的记录数
    HISTTIMEFORMAT='%F %T'   #输出时间格式
    export HISTTIMEFORMAT. #自定义日志输出格式,也就是取出我们想要的字段,以json的形式
    HISTTIMEFORMAT修改线上的相关格式
    PROMPT_COMMAND实时记录历史命令(一般用在存储history命令文件中)

    vim /etc/bashrc


    HISTDIR='/var/log/command.log'
    if [ ! -f $HISTDIR ];then
    touch $HISTDIR
    chmod 666 $HISTDIR
    fi


    export HISTTIMEFORMAT="{"TIME":"%F%T","HOSTNAME":"$HOSTNAME","LI":"$(who am i 2>/dev/null| awk '{print $NF}'|sed -e's/[()]//g')","LOGIN_USER":"$(who am i|awk '{print$1}')","CHECK_USER":"${USER}","CMD":""

    export PROMPT_COMMAND='history 1|tail -1|sed "s/^[ ]+[0-9]+ //"|sed "s/$/"}/">>/var/log/command.log'

    export PROMPT_COMMAND='history >> /var/log/command.log'

    最后source /etc/bashrc

    配置logstash:Vim /conf/history.conf

    input {
      file {
          path => ["/var/log/command.log"]
          type => "command"
          codec => "json"
      }
    }
    output{
       if [type] == "command"{
          stdout{codec=>rubydebug}
          elasticsearch {
          hosts  => "hadoop01:9200"
          index  => "history-%{+yyyy.MM.dd}"
      }
    }
    }

    启动logstash:

    bin/logstash -f /usr/local/elk/logstash-5.5.2/conf/template/history.conf

    去9100页面,查看是否已经把history日志灌入elasticsearch

    3.3:项目之采集nginx日志

    在企业中, 日志量非常大,如果直接采用:

    会出现这样一种情况:logstash瞬间采集大量日志(同一时间要存储的数据量已经超过elasticsearch的最大连接数),这个时候elasticsearch会忽略继续存储的数据,也就是所谓的丢数据现象;

    那么为了不让logstash采集的数据突发、井喷的方式将数据灌入elasticsearch;最合理的方式是在中间介入缓冲队列:kafka、rubbitMQ、redis等

    我们采用的架构师:

    1):配置nginx中的日志

    Nginx中生成的日志格式:

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                        '$status $body_bytes_sent "$http_referer" '
                        '"$http_user_agent" "$http_x_forwarded_for"';

    Nginx采集的日志配置

    2):配置agent

    agent的目的是将nginx中的日志采集到hadoop01的redis中,这种agent只是做日志发送,对性能影响不大,读取access.log.1日志文件,并且发送到远端redis。

    配置:Vim nginx.conf

    input {
      file {
          path => ["/usr/local/nginx/logs/agent.log"]
          type => "nginx_access"
      }
    }
    output {
    if [type] == "nginx_access"{
      redis {
      host => ["hadoop01:6379"]
      data_type =>"list"
      db => 3
      key => "agent"
      }
    }
    stdout{codec=>rubydebug}
    }

    启动logstash

    bin/logstash -f myconf/nginx_redis.conf --path.data=/home/angel/logstash-5.5.2/logs

    使用实际成产数据做测试数据,方便观察地理位置:

    cat /usr/local/generator_nginx_data/access.log >> /usr/local/nginx/logs/access.log.1

    观察redis是否出现nginx的key:

    3):配置indexer

    Vim indexer.conf

    input{
    redis {
      host => "hadoop01"
      port => 6379
      data_type => "list"
      key => "agent"
      db => 3
    }
    }
    filter {
      date {
          match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
      }
      grok {
              match => {
                  "message" => "%{IPORHOST:remote_addr} - %{NGUSER:remote_addr} [%{HTTPDATE:time_local}] "(?:%{WORD:request} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS:http_referer} %{QS:agent} %{NOTSPACE:http_x_forwarded_for}"
              }
          }
      geoip{
                   source => "remote_addr"
                  database => "/home/angel/logstash-5.5.2/conf/GeoLite2-City.mmdb"
                  target => "geoip"
                  add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
                  add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
                   #fields => ["country_name", "region_name", "city_name", "latitude", "longitude"]
          }
        mutate {
              convert => [ "[geoip][location]", "geo_point" ]
      }
    }
    output {
          stdout{codec=>rubydebug}
          elasticsearch {
                  action => "index"
                  hosts =>"hadoop01:9200"
                  index => "redis-es-%{+yyyy.MM.dd}"
        }
    }

    启动:bin/logstash -f /usr/local/elk/logstash-5.5.2/conf/template/indexer.conf

    然后观察elasticsearch,是否已经插入数据;

    Kibana生成高德地图:

    3.4:项目之采集tomcat日志

    Logstash分析tomcat日志是最复杂的,没有之一;

    下面给出tomcat的日志格式:

    Caused by: org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 5000
           at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876)
           at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:98)
           at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:92)
           at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:80)
           at com.alibaba.dubbo.remoting.zookeeper.zkclient.ZkclientZookeeperClient.<init>(ZkclientZookeeperClient.java:29)
           at com.alibaba.dubbo.remoting.zookeeper.zkclient.ZkclientZookeeperTransporter.connect(ZkclientZookeeperTransporter.java:10)
           at com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter$Adpative.connect(ZookeeperTransporter$Adpative.java)
           at com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry.<init>(ZookeeperRegistry.java:69)
           at com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistryFactory.createRegistry(ZookeeperRegistryFactory.java:37)
           at com.alibaba.dubbo.registry.support.AbstractRegistryFactory.getRegistry(AbstractRegistryFactory.java:94)
           at com.alibaba.dubbo.registry.RegistryFactory$Adpative.getRegistry(RegistryFactory$Adpative.java)
           at com.alibaba.dubbo.registry.integration.RegistryProtocol.refer(RegistryProtocol.java:240)
           at com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper.refer(ProtocolListenerWrapper.java:63)
           at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper.refer(ProtocolFilterWrapper.java:60)
           at com.alibaba.dubbo.rpc.Protocol$Adpative.refer(Protocol$Adpative.java)
           at com.alibaba.dubbo.config.ReferenceConfig.createProxy(ReferenceConfig.java:392)
           at com.alibaba.dubbo.config.ReferenceConfig.init(ReferenceConfig.java:300)
           at com.alibaba.dubbo.config.ReferenceConfig.get(ReferenceConfig.java:138)
           at com.alibaba.dubbo.config.spring.ReferenceBean.getObject(ReferenceBean.java:65)
           at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:142)
          ... 50 more
    INFO config.AbstractConfig - [DUBBO] Run shutdown hook now., dubbo version: 2.8.4, current host: 192.168.77.10
    INFO config.AbstractConfig - [DUBBO] Run shutdown hook now., dubbo version: 2.8.4, current host: 192.168.77.10
    INFO support.AbstractRegistryFactory - [DUBBO] Close all registries [], dubbo version: 2.8.4, current host: 192.168.77.10

    在tomcat的catlina.out中,这样的日种子是最常见的;

    但是这样的日志有个问题:一条完整的日志,tomcat会按照多行的方式进行输出;

    有的同学会说没关系,因为前面学习了mutiline进行日志的合并,那么下面我们演示一下

    3.4.1:方式1:mutiline

    使用mutiline之前,需要安装插件:

    bin/logstash-plugin install logstash-filter-multiline

    如果报错,请使用

    bin/logstash-plugin install --version 3.0.4 logstash-filter-multiline

    如果还是报错,请下载logstash-filter-multiline-3.0.4.gem

    然后:bin/logstash-plugin install /home/angel/logstash-5.5.2/vendor/bundle/jruby/1.9/cache/logstash-filter-multiline-3.0.4.gem

    配置logstash的采集文件:vim tomcat.conf

    input{
      file {
          path => ["/home/angel/servers/apache-tomcat-7.0.84/logs/catalina.out"]
      }
    }
    filter {
          multiline {
                  pattern => "(^%{CATALINA_DATESTAMP})"  #使用pattern中的java库
                  negate => true
                  what => "previous". #如果正则表达式匹配了,那么该事件是属于下一个(属于下一次触发的事件)还是上一个(触发的事件)
          }
           if "_grokparsefailure" in [tags] {
              drop { }
          }
            grok {
                    match => [ "message", "%{CATALINALOG}" ]
          }
          date {
                    match => [ "timestamp", "yyyy-MM-dd HH:mm:ss,SSS Z", "MMM dd, yyyy HH:mm:ss a" ]
          }
      }
    output{
          stdout{codec=>rubydebug}
          elasticsearch {
                  action => "index"
                  hosts =>"hadoop01:9200"
                  index => "tomcat_mutiline-%{+yyyy.MM.dd}"
        }

    }

    启动tomcat,然后观察日志输出情况:直接在9100页面查看

    信息全部积压在一个message钟,可读性非常的差,后期在排查日志的时候,几乎无法排查;

    3.4.2:自定义tomcat的输出格式

    1):下载包:

    tomcat-juli-adapters.jar(lib目录下)

    tomcat-juli.jar(bin目录下)

    log4j-1.2.17.jar(lib目录下)

    2):然后将tomcat的conf文件夹下的:logging.properties删除掉或者重命名

    3):最后在tomcat的lib下导入log4j.properties

    4):重启tomcr成json日志文件:

    5):配置logstash:Vim tomcat2.conf

    input {
    file {
      codec => json
      path => "/home/angel/servers/apache-tomcat-7.0.84/logs/catalina"
      type => "log4j"
      start_position => "beginning"
      sincedb_path => "/dev/null"
    }
    }
    output{
      stdout{codec=>rubydebug}
      if[type] == "log4j"{
          elasticsearch {
                  action => "index"
                  hosts => "hadoop01:9200"
                  index => "tomcat_json-%{+yyyy.MM.dd}"
        }
    }
    }

    最终效果截图:

    我们可以看到的确比第一次的mutiline好了很多,但是仍有不足的地方:很多地方空行,有的地方一行非常的大,不利于后期的错误定位

    3.4.3:log4j-jsonevent-layout方式

    1):下载依赖:

    commons-lang-2.6.jar(lib目录下)

    jsonevent-layout-1.7-SNAPSHOT.jar(lib目录下)

    json-smart-1.1.1.jar(lib目录下)

    tomcat-juli-adapters.jar(lib目录下)

    tomcat-juli.jar(bin目录下)

    log4j-1.2.17.jar(lib目录下)

     

    2):在tomcat的lib下编写log4j.properties

    log4j.rootCategory=info, RollingLog log4j.appender.RollingLog=org.apache.log4j.DailyRollingFileAppender

    log4j.appender.RollingLog.Threshold=TRACE

    log4j.appender.RollingLog.File=/home/angel/servers/apache-tomcat-7.0.84/logs/json_tomcat.log

    log4j.appender.RollingLog.DatePattern=.yyyy-MM-dd

    log4j.appender.RollingLog.layout=net.logstash.log4j.JSONEventLayoutV1

     

    3):重启tomcat,观察json_tomcat.log

    4):编写logstash

    input {
    file {
      codec => json
      path => "/home/angel/servers/apache-tomcat-7.0.84/logs/json_tomcat.log"
      type => "log4j"
      start_position => "beginning"
      sincedb_path => "/dev/null"
    }
    }
    output{
      stdout{codec=>rubydebug}
      if[type] == "log4j"{
          elasticsearch {
                  action => "index"
                  hosts => "hadoop01:9200"
                  index => "tomcat_json2-%{+yyyy.MM.dd}"
        }
    }
    }

    启动logstash:在9100查看:

    可以观察到,自动帮助我们匹配error日志,方便后期错误日志的定位

    3.5:采集mysql的慢查询日志

    在实际的生产中,我们也会对mysql的慢查询日志做分析:

    登录mysql,执行:

    show variables like '%slow%';

    mysql> show variables like '%slow%';
    +---------------------+------------------------------------+
    | Variable_name       | Value                             |
    +---------------------+------------------------------------+
    | log_slow_queries   | OFF             指定是否开启慢查询日志                   |
    | slow_launch_time   | 2               如果创建线程需要比slow_launch_time更多的时间,服务器会增加 Slow_launch_threads的状态变量                 |
    | slow_query_log     | OFF             指定是否开启慢查询日志                 |
    | slow_query_log_file | /usr/local/elk/mysql_data/slow.log |
    +---------------------+------------------------------------+
    4 rows in set (0.00 sec)

    1):开启慢查询方式1:

    Vim /etc/my.cnf 配置mysql的慢查询

    [mysql_slow]
    set global slow_query_log='ON';
    set global slow_query_log_file='/home/angel/servers/logstash-5.5.2/logs/slow.log';
    set global long_query_time=1;

    (做测试要求,查询超过1S , 都是慢查询操作)

    mysql> show variables like '%slow%';
    +---------------------+--------------------------------------------------+
    | Variable_name       | Value                                           |
    +---------------------+--------------------------------------------------+
    | log_slow_queries   | ON                                             |
    | slow_launch_time   | 2                                               |
    | slow_query_log     | ON                                               |
    | slow_query_log_file | /home/angel/servers/logstash-5.5.2/logs/slow.log
    +---------------------+--------------------------------------------------+
    4 rows in set (0.00 sec)

    2):开启慢查询方式2:

    将 slow_query_log 全局变量设置为“ON”状态

    mysql> set global slow_query_log='ON';
    设置慢查询日志存放的位置

    mysql> set global slow_query_log_file='/var/run/mysqld/mysqld-slow.log';
    查询超过1秒就记录

    mysql> set global long_query_time=1;
    mysql> flush privileges;

    然后做查询操作:

    use test;

    select sleep(1), tid,t_name,t_password,sex,description from t_teacher limit 10;

    观察mysql的慢查询日志:

    我们从截图可以看到往往生成的日志中:

    第一行是执行的时间

    第二行是用户信息

    第三行是相应时间和相关的行数信息

    编写logstash:vim conf/mysql.conf

    input {
      file {
        type => "mysql-slow"
        path => "/var/run/mysqld/mysqld-slow.log" #注意文件权限
        codec => multiline {
          pattern => "^# User@Host:"
          negate => true
          what => "previous"
        }
      }
    }
    
    
    filter {
      
      grok {
        match => { "message" => "SELECT SLEEP" }
        add_tag => [ "sleep_drop" ]
        tag_on_failure => [] # prevent default _grokparsefailure tag on real records
      }
      if "sleep_drop" in [tags] {
        drop {}
      }
    grok {
    match => [ "message", "(?m)^# User@Host: %{USER:user}[[^]]+] @ (?:(?<clienthost>S*) )?[(?:%{IP:clientip})?]s*# Query_time: %{NUMBER:query_time:float}s+Lock_time: %{NUMBER:lock_time:float}s+Rows_sent: %{NUMBER:rows_sent:int}s+Rows_examined: %{NUMBER:rows_examined:int}s*(?:use %{DATA:database};s*)?SET timestamp=%{NUMBER:timestamp};s*(?<query>(?<action>w+)s+.*)
    # Time:.*$" ]
    }
      
      date {
        match => [ "timestamp", "UNIX" ]
        remove_field => [ "timestamp" ]
      }
    }
    
    output {
     stdout {codec => rubydebug {}}
    elasticsearch {
                    action => "index"
                    hosts => "hadoop01:9200"
                    index => "mysql-slow-%{+yyyy.MM.dd}"
         }
    }

    相应截图:

  • 相关阅读:
    Ansible命令介绍之ansible
    Ansible命令介绍
    Ansible配置文件讲解
    博客搬家。新博客地址 http://fangjian0423.github.io/
    SpringMVC源码分析系列
    MyBatis拦截器原理探究
    通过源码分析MyBatis的缓存
    ThreadLocal原理及其实际应用
    logstash搭建日志追踪系统
    Mybatis解析动态sql原理分析
  • 原文地址:https://www.cnblogs.com/niutao/p/10909481.html
Copyright © 2011-2022 走看看