zoukankan      html  css  js  c++  java
  • ELK日志平台搭建

    功能:

    1、  查看当天的服务器日志信息(要求:在出现警告甚至警告级别以上的都要查询)
    2、 能够查看服务器的所有用户的操作日志
    3、 能够查询nginx服务器采集的日志(kibana作图)
    4、 查看tomcat+log4j运行日志
    5、 Mysql的慢查询日志

    3.1:采集服务器日志

    Rsyslog是CentOS6.X自带的一款系统日志工具:

    1.支持多线程
    2.支持TCP,SSL,TLS,RELP等协议
    3.支持将日志写入MySQL, PGSQL, Oracle等多种关系型数据中
    4.拥有强大的过滤器,可实现过滤系统信息中的任意部分
    5.可以自定义日志输出格式

    对于ELK stack来说,我们需要实时的知道当前系统运行的情况,如果当前系统出现了问题,能够及时发现,以免影响线上实例

    Rsyslog配置文件介绍:/etc/rsyslog.conf文件:

    *.info;mail.none;authpriv.none;cron.none/var/log/messages.各类型日志存放位置
    cron.* /var/log/cron 具体日志存放的位置
    authpriv.* /var/log/secure 认证授权认证
    mail.* -/var/log/maillog 邮件日志
    cron.* /var/log/cron 任务计划相关日志
    kern   内核相关日志
    lpr   打印
    mark(syslog)   rsyslog服务内部的信息,时间标识
    news   新闻组
    user   用户程序产生的相关信息
    uucp   协议
    local 0~7   用户自定义日志级别

    日志级别:

    rsyslog共有7种日志级别,数字代号从 0~7。具体的意义如下所示:

    0 debug       –有调式信息的,日志信息最多

    1 info         一般信息的日志,最常用

    2 notice       –最具有重要性的普通条件的信息

    3 warning     –警告级别

    4 err         –错误级别,阻止某个功能或者模块不能正常工作的信息

    5 crit         –严重级别,阻止整个系统或者整个软件不能正常工作的信息

    6 alert       –需要立刻修改的信息

    7 emerg     –内核崩溃等严重信息

    本项目中,将日志界别调整成3 warning:

    local3.*                                                /var/log/boot.log
    *.warning                                               /var/log/warning_Log

    然后将日志信息发送至6789端口:

    *.* @@hadoop01:6789

    这样系统在生成日志,同时也会将日志发送到6789端口

    重启日志:/etc/init.d/rsyslog restart

    编写logstash:Vim rsyslog.conf

    input {
      tcp {
      port => "6789"   #监控6789端口
      type => "rsyslog"   #日志类型是rsyslog
      }
    }
    filter {
     if [type] == "rsyslog" {   # 做一次判断,只要从6789端口过来的rsyslog日志
      grok { # 通过正则表达式,取出想要的字段
        match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
        add_field => [ "received_at", "%{@timestamp}" ]
        add_field => [ "received_from", "%{host}" ]
      }
      date {
        match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] #将系统的日志格式化成标准国际化时间
      }
    }
    }
    output{   #将日志打入elasticsearch
       if [type] == "rsyslog"{
          stdout{codec=>rubydebug}
          elasticsearch {
          action => "index"
          hosts  => "hadoop01:9200"
          index  => "logstash-%{type}-%{+yyyy.MM.dd}"
      }
    }
    }

    启动logstash:

    bin/logstash -f /usr/local/elk/logstash-5.5.2/conf/template/rsyslog.conf

    通过telnet hadoop01 6789传输数据:

    【注意,在logstash中的grok是正则表达式,用来解析当前数据】

    Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154]
    Jun 05 08:00:00 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' denied
    Jun 05 08:10:00 louis CRON[620]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)
    Jun 05 08:05:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'.

    然后去ip:9100查看日志是否生成:

    3.2:采集服务器的所有用户的操作日志

    用户在命令行环境下的操作日志都会被系统记录下来;比如我们输入history命令,都会展示出每一个用户输入过的命令;

    .bash_history文件,这个日志格式可以定义成我们需要显示的内容,方便我们排查或者做入侵检查的时候使用;

    自定义日志格式:

    HISTFILESIZE=4000     #保存命令的记录总数
    HISTSIZE=4000         # history 命令输出的记录数
    HISTTIMEFORMAT='%F %T'   #输出时间格式
    export HISTTIMEFORMAT. #自定义日志输出格式,也就是取出我们想要的字段,以json的形式
    HISTTIMEFORMAT修改线上的相关格式
    PROMPT_COMMAND实时记录历史命令(一般用在存储history命令文件中)

    vim /etc/bashrc


    HISTDIR='/var/log/command.log'
    if [ ! -f $HISTDIR ];then
    touch $HISTDIR
    chmod 666 $HISTDIR
    fi


    export HISTTIMEFORMAT="{"TIME":"%F%T","HOSTNAME":"$HOSTNAME","LI":"$(who am i 2>/dev/null| awk '{print $NF}'|sed -e's/[()]//g')","LOGIN_USER":"$(who am i|awk '{print$1}')","CHECK_USER":"${USER}","CMD":""

    export PROMPT_COMMAND='history 1|tail -1|sed "s/^[ ]+[0-9]+ //"|sed "s/$/"}/">>/var/log/command.log'

    export PROMPT_COMMAND='history >> /var/log/command.log'

    最后source /etc/bashrc

    配置logstash:Vim /conf/history.conf

    input {
      file {
          path => ["/var/log/command.log"]
          type => "command"
          codec => "json"
      }
    }
    output{
       if [type] == "command"{
          stdout{codec=>rubydebug}
          elasticsearch {
          hosts  => "hadoop01:9200"
          index  => "history-%{+yyyy.MM.dd}"
      }
    }
    }

    启动logstash:

    bin/logstash -f /usr/local/elk/logstash-5.5.2/conf/template/history.conf

    去9100页面,查看是否已经把history日志灌入elasticsearch

    3.3:项目之采集nginx日志

    在企业中, 日志量非常大,如果直接采用:

    会出现这样一种情况:logstash瞬间采集大量日志(同一时间要存储的数据量已经超过elasticsearch的最大连接数),这个时候elasticsearch会忽略继续存储的数据,也就是所谓的丢数据现象;

    那么为了不让logstash采集的数据突发、井喷的方式将数据灌入elasticsearch;最合理的方式是在中间介入缓冲队列:kafka、rubbitMQ、redis等

    我们采用的架构师:

    1):配置nginx中的日志

    Nginx中生成的日志格式:

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                        '$status $body_bytes_sent "$http_referer" '
                        '"$http_user_agent" "$http_x_forwarded_for"';

    Nginx采集的日志配置

    2):配置agent

    agent的目的是将nginx中的日志采集到hadoop01的redis中,这种agent只是做日志发送,对性能影响不大,读取access.log.1日志文件,并且发送到远端redis。

    配置:Vim nginx.conf

    input {
      file {
          path => ["/usr/local/nginx/logs/agent.log"]
          type => "nginx_access"
      }
    }
    output {
    if [type] == "nginx_access"{
      redis {
      host => ["hadoop01:6379"]
      data_type =>"list"
      db => 3
      key => "agent"
      }
    }
    stdout{codec=>rubydebug}
    }

    启动logstash

    bin/logstash -f myconf/nginx_redis.conf --path.data=/home/angel/logstash-5.5.2/logs

    使用实际成产数据做测试数据,方便观察地理位置:

    cat /usr/local/generator_nginx_data/access.log >> /usr/local/nginx/logs/access.log.1

    观察redis是否出现nginx的key:

    3):配置indexer

    Vim indexer.conf

    input{
    redis {
      host => "hadoop01"
      port => 6379
      data_type => "list"
      key => "agent"
      db => 3
    }
    }
    filter {
      date {
          match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
      }
      grok {
              match => {
                  "message" => "%{IPORHOST:remote_addr} - %{NGUSER:remote_addr} [%{HTTPDATE:time_local}] "(?:%{WORD:request} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS:http_referer} %{QS:agent} %{NOTSPACE:http_x_forwarded_for}"
              }
          }
      geoip{
                   source => "remote_addr"
                  database => "/home/angel/logstash-5.5.2/conf/GeoLite2-City.mmdb"
                  target => "geoip"
                  add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
                  add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
                   #fields => ["country_name", "region_name", "city_name", "latitude", "longitude"]
          }
        mutate {
              convert => [ "[geoip][location]", "geo_point" ]
      }
    }
    output {
          stdout{codec=>rubydebug}
          elasticsearch {
                  action => "index"
                  hosts =>"hadoop01:9200"
                  index => "redis-es-%{+yyyy.MM.dd}"
        }
    }

    启动:bin/logstash -f /usr/local/elk/logstash-5.5.2/conf/template/indexer.conf

    然后观察elasticsearch,是否已经插入数据;

    Kibana生成高德地图:

    3.4:项目之采集tomcat日志

    Logstash分析tomcat日志是最复杂的,没有之一;

    下面给出tomcat的日志格式:

    Caused by: org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 5000
           at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876)
           at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:98)
           at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:92)
           at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:80)
           at com.alibaba.dubbo.remoting.zookeeper.zkclient.ZkclientZookeeperClient.<init>(ZkclientZookeeperClient.java:29)
           at com.alibaba.dubbo.remoting.zookeeper.zkclient.ZkclientZookeeperTransporter.connect(ZkclientZookeeperTransporter.java:10)
           at com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter$Adpative.connect(ZookeeperTransporter$Adpative.java)
           at com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry.<init>(ZookeeperRegistry.java:69)
           at com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistryFactory.createRegistry(ZookeeperRegistryFactory.java:37)
           at com.alibaba.dubbo.registry.support.AbstractRegistryFactory.getRegistry(AbstractRegistryFactory.java:94)
           at com.alibaba.dubbo.registry.RegistryFactory$Adpative.getRegistry(RegistryFactory$Adpative.java)
           at com.alibaba.dubbo.registry.integration.RegistryProtocol.refer(RegistryProtocol.java:240)
           at com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper.refer(ProtocolListenerWrapper.java:63)
           at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper.refer(ProtocolFilterWrapper.java:60)
           at com.alibaba.dubbo.rpc.Protocol$Adpative.refer(Protocol$Adpative.java)
           at com.alibaba.dubbo.config.ReferenceConfig.createProxy(ReferenceConfig.java:392)
           at com.alibaba.dubbo.config.ReferenceConfig.init(ReferenceConfig.java:300)
           at com.alibaba.dubbo.config.ReferenceConfig.get(ReferenceConfig.java:138)
           at com.alibaba.dubbo.config.spring.ReferenceBean.getObject(ReferenceBean.java:65)
           at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:142)
          ... 50 more
    INFO config.AbstractConfig - [DUBBO] Run shutdown hook now., dubbo version: 2.8.4, current host: 192.168.77.10
    INFO config.AbstractConfig - [DUBBO] Run shutdown hook now., dubbo version: 2.8.4, current host: 192.168.77.10
    INFO support.AbstractRegistryFactory - [DUBBO] Close all registries [], dubbo version: 2.8.4, current host: 192.168.77.10

    在tomcat的catlina.out中,这样的日种子是最常见的;

    但是这样的日志有个问题:一条完整的日志,tomcat会按照多行的方式进行输出;

    有的同学会说没关系,因为前面学习了mutiline进行日志的合并,那么下面我们演示一下

    3.4.1:方式1:mutiline

    使用mutiline之前,需要安装插件:

    bin/logstash-plugin install logstash-filter-multiline

    如果报错,请使用

    bin/logstash-plugin install --version 3.0.4 logstash-filter-multiline

    如果还是报错,请下载logstash-filter-multiline-3.0.4.gem

    然后:bin/logstash-plugin install /home/angel/logstash-5.5.2/vendor/bundle/jruby/1.9/cache/logstash-filter-multiline-3.0.4.gem

    配置logstash的采集文件:vim tomcat.conf

    input{
      file {
          path => ["/home/angel/servers/apache-tomcat-7.0.84/logs/catalina.out"]
      }
    }
    filter {
          multiline {
                  pattern => "(^%{CATALINA_DATESTAMP})"  #使用pattern中的java库
                  negate => true
                  what => "previous". #如果正则表达式匹配了,那么该事件是属于下一个(属于下一次触发的事件)还是上一个(触发的事件)
          }
           if "_grokparsefailure" in [tags] {
              drop { }
          }
            grok {
                    match => [ "message", "%{CATALINALOG}" ]
          }
          date {
                    match => [ "timestamp", "yyyy-MM-dd HH:mm:ss,SSS Z", "MMM dd, yyyy HH:mm:ss a" ]
          }
      }
    output{
          stdout{codec=>rubydebug}
          elasticsearch {
                  action => "index"
                  hosts =>"hadoop01:9200"
                  index => "tomcat_mutiline-%{+yyyy.MM.dd}"
        }

    }

    启动tomcat,然后观察日志输出情况:直接在9100页面查看

    信息全部积压在一个message钟,可读性非常的差,后期在排查日志的时候,几乎无法排查;

    3.4.2:自定义tomcat的输出格式

    1):下载包:

    tomcat-juli-adapters.jar(lib目录下)

    tomcat-juli.jar(bin目录下)

    log4j-1.2.17.jar(lib目录下)

    2):然后将tomcat的conf文件夹下的:logging.properties删除掉或者重命名

    3):最后在tomcat的lib下导入log4j.properties

    4):重启tomcr成json日志文件:

    5):配置logstash:Vim tomcat2.conf

    input {
    file {
      codec => json
      path => "/home/angel/servers/apache-tomcat-7.0.84/logs/catalina"
      type => "log4j"
      start_position => "beginning"
      sincedb_path => "/dev/null"
    }
    }
    output{
      stdout{codec=>rubydebug}
      if[type] == "log4j"{
          elasticsearch {
                  action => "index"
                  hosts => "hadoop01:9200"
                  index => "tomcat_json-%{+yyyy.MM.dd}"
        }
    }
    }

    最终效果截图:

    我们可以看到的确比第一次的mutiline好了很多,但是仍有不足的地方:很多地方空行,有的地方一行非常的大,不利于后期的错误定位

    3.4.3:log4j-jsonevent-layout方式

    1):下载依赖:

    commons-lang-2.6.jar(lib目录下)

    jsonevent-layout-1.7-SNAPSHOT.jar(lib目录下)

    json-smart-1.1.1.jar(lib目录下)

    tomcat-juli-adapters.jar(lib目录下)

    tomcat-juli.jar(bin目录下)

    log4j-1.2.17.jar(lib目录下)

     

    2):在tomcat的lib下编写log4j.properties

    log4j.rootCategory=info, RollingLog log4j.appender.RollingLog=org.apache.log4j.DailyRollingFileAppender

    log4j.appender.RollingLog.Threshold=TRACE

    log4j.appender.RollingLog.File=/home/angel/servers/apache-tomcat-7.0.84/logs/json_tomcat.log

    log4j.appender.RollingLog.DatePattern=.yyyy-MM-dd

    log4j.appender.RollingLog.layout=net.logstash.log4j.JSONEventLayoutV1

     

    3):重启tomcat,观察json_tomcat.log

    4):编写logstash

    input {
    file {
      codec => json
      path => "/home/angel/servers/apache-tomcat-7.0.84/logs/json_tomcat.log"
      type => "log4j"
      start_position => "beginning"
      sincedb_path => "/dev/null"
    }
    }
    output{
      stdout{codec=>rubydebug}
      if[type] == "log4j"{
          elasticsearch {
                  action => "index"
                  hosts => "hadoop01:9200"
                  index => "tomcat_json2-%{+yyyy.MM.dd}"
        }
    }
    }

    启动logstash:在9100查看:

    可以观察到,自动帮助我们匹配error日志,方便后期错误日志的定位

    3.5:采集mysql的慢查询日志

    在实际的生产中,我们也会对mysql的慢查询日志做分析:

    登录mysql,执行:

    show variables like '%slow%';

    mysql> show variables like '%slow%';
    +---------------------+------------------------------------+
    | Variable_name       | Value                             |
    +---------------------+------------------------------------+
    | log_slow_queries   | OFF             指定是否开启慢查询日志                   |
    | slow_launch_time   | 2               如果创建线程需要比slow_launch_time更多的时间,服务器会增加 Slow_launch_threads的状态变量                 |
    | slow_query_log     | OFF             指定是否开启慢查询日志                 |
    | slow_query_log_file | /usr/local/elk/mysql_data/slow.log |
    +---------------------+------------------------------------+
    4 rows in set (0.00 sec)

    1):开启慢查询方式1:

    Vim /etc/my.cnf 配置mysql的慢查询

    [mysql_slow]
    set global slow_query_log='ON';
    set global slow_query_log_file='/home/angel/servers/logstash-5.5.2/logs/slow.log';
    set global long_query_time=1;

    (做测试要求,查询超过1S , 都是慢查询操作)

    mysql> show variables like '%slow%';
    +---------------------+--------------------------------------------------+
    | Variable_name       | Value                                           |
    +---------------------+--------------------------------------------------+
    | log_slow_queries   | ON                                             |
    | slow_launch_time   | 2                                               |
    | slow_query_log     | ON                                               |
    | slow_query_log_file | /home/angel/servers/logstash-5.5.2/logs/slow.log
    +---------------------+--------------------------------------------------+
    4 rows in set (0.00 sec)

    2):开启慢查询方式2:

    将 slow_query_log 全局变量设置为“ON”状态

    mysql> set global slow_query_log='ON';
    设置慢查询日志存放的位置

    mysql> set global slow_query_log_file='/var/run/mysqld/mysqld-slow.log';
    查询超过1秒就记录

    mysql> set global long_query_time=1;
    mysql> flush privileges;

    然后做查询操作:

    use test;

    select sleep(1), tid,t_name,t_password,sex,description from t_teacher limit 10;

    观察mysql的慢查询日志:

    我们从截图可以看到往往生成的日志中:

    第一行是执行的时间

    第二行是用户信息

    第三行是相应时间和相关的行数信息

    编写logstash:vim conf/mysql.conf

    input {
      file {
        type => "mysql-slow"
        path => "/var/run/mysqld/mysqld-slow.log" #注意文件权限
        codec => multiline {
          pattern => "^# User@Host:"
          negate => true
          what => "previous"
        }
      }
    }
    
    
    filter {
      
      grok {
        match => { "message" => "SELECT SLEEP" }
        add_tag => [ "sleep_drop" ]
        tag_on_failure => [] # prevent default _grokparsefailure tag on real records
      }
      if "sleep_drop" in [tags] {
        drop {}
      }
    grok {
    match => [ "message", "(?m)^# User@Host: %{USER:user}[[^]]+] @ (?:(?<clienthost>S*) )?[(?:%{IP:clientip})?]s*# Query_time: %{NUMBER:query_time:float}s+Lock_time: %{NUMBER:lock_time:float}s+Rows_sent: %{NUMBER:rows_sent:int}s+Rows_examined: %{NUMBER:rows_examined:int}s*(?:use %{DATA:database};s*)?SET timestamp=%{NUMBER:timestamp};s*(?<query>(?<action>w+)s+.*)
    # Time:.*$" ]
    }
      
      date {
        match => [ "timestamp", "UNIX" ]
        remove_field => [ "timestamp" ]
      }
    }
    
    output {
     stdout {codec => rubydebug {}}
    elasticsearch {
                    action => "index"
                    hosts => "hadoop01:9200"
                    index => "mysql-slow-%{+yyyy.MM.dd}"
         }
    }

    相应截图:

  • 相关阅读:
    《.NET内存管理宝典 》(Pro .NET Memory Management) 阅读指南
    《.NET内存管理宝典 》(Pro .NET Memory Management) 阅读指南
    《.NET内存管理宝典 》(Pro .NET Memory Management) 阅读指南
    使用Jasmine和karma对传统js进行单元测试
    《.NET内存管理宝典 》(Pro .NET Memory Management) 阅读指南
    《.NET内存管理宝典 》(Pro .NET Memory Management) 阅读指南
    nginx 基于IP的多虚拟主机配置
    Shiro 框架的MD5加密算法实现原理
    项目实战:Qt+OSG三维点云引擎(支持原点,缩放,单独轴或者组合多轴拽拖旋转,支持导入点云文件)
    实用技巧:阿里云服务器建立公网物联网服务器(解决阿里云服务器端口,公网连接不上的问题)
  • 原文地址:https://www.cnblogs.com/niutao/p/10909481.html
Copyright © 2011-2022 走看看