zoukankan      html  css  js  c++  java
  • ELK收集tomcat访问日志并存取mysql数据库案例

    这个案例中,tomcat产生的日志由filebeat收集,然后存取到redis中,再由logstash进行过滤清洗等操作,最后由elasticsearch存储索引并由kibana进行展示。

    1、配置tomcat自定义日志

    <Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
                   prefix="localhost_access_log" suffix=".log"
                   pattern="{&quot;client_ip&quot;:&quot;%{X-Forwarded-For}i&quot;,&quot;direct_ip&quot;:&quot;%a&quot;,&quot;client user&quot;:&quot;%l&quot;,&quot;authenticated&quot;:&quot;%u&quot;,&quot;timestamp&quot;:&quot;%{yyyy-MM-dd HH:mm:ss Z}t&quot;,&quot;request_method&quot;:&quot;%m&quot;,&quot;URI&quot;:&quot;%U&quot;,&quot;Protocol&quot;:&quot;%H&quot;,&quot;status&quot;:&quot;%s&quot;,&quot;send bytes&quot;:&quot;%B&quot;,&quot;Query?string&quot;:&quot;%q&quot;,&quot;Referer&quot;:&quot;%{Referer}i&quot;,&quot;Agent version&quot;:&quot;%{User-Agent}i&quot;}"/>

    需要注意的是第一个X-Forwarded-For表示获取一个ip列表,但只有第一个ip是真正的客户端ip,不过如果没有代理,直接访问tomcat,真正的ip是direct_ip。还有一个是时间格式,这里采用自定义时间格式,没有使用“%t”参数。

    2、配置filebeat

    - type: log
    
      # Change to true to enable this input configuration.
      enabled: true
    
      # Paths that should be crawled and fetched. Glob based paths.
      paths:
        - /usr/local/tomcat/logs/localhost_access_log.*.log
        #- c:programdataelasticsearchlogs*
      document_type: tomcat-accesslog
    output.redis:
            enable: true
            hosts: ["172.16.0.54:6379"]
            db: 3
            timeout: 5
            key: tomcat_accesslog_filter_index
            password: 123456

    3、配置mysql

    1)安装mariadb-server以及创建用户并授权访问mysql(最好不使用root用户)。

    2)安装驱动步骤

    先下载mysql-connector-java驱动:wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.47.tar.gz

    然后在/usr/local/logstash/vendor目录下创建目录“(我这里是源码安装的logstash)

    mkdir -pv  /usr/local/logstash/vendor/jar/jdbc

    将上面的mysql驱动包(带有bin关键字的那个)移动到此目录下。

    3)安装gem

    yum install gem -y

    然后配置gem源

    gem sources --remove https://rubygems.org/ //或者
    gem sources --add http://gems.ruby-china.com/

    最后安装插件

    cd /usr/local/logstash/bin
    ./logstash-plugin install logstash-output-jdbc

    最后验证是否安装成功

    /usr/local/logstash/bin/logstash-plugin | grep jdbc

    [root@ELK-chaofeng07 logstash]# /usr/local/logstash/bin/logstash-plugin list | grep jdbc
    logstash-input-jdbc
    logstash-output-jdbc

    4)进行配置logstash的配置文件,以.conf结尾即可

    input{
            redis {
                    host =>"172.16.0.54"
                    port => 6379
                    data_type => "list"
                    db => "3"
                    password => "123456"
                    key => "tomcat_accesslog_filter_index"
                    codec => "json"
                    add_field => {
                            "[@metadata][mytomcat]" => "tomcat_accesslog_filter_log"
                    }
            }
    }
    
    
    filter {
            if [@metadata][mytomcat] == "tomcat_accesslog_filter_log" {
                    mutate {
                            gsub => ["message","\x","\x"]
                    }
                    if ( 'method":"HEAD' in [message] ) {
                            drop{}
                    }
                    json {
                            source => "message"
                            add_field => {"[@metadata][direct_ip]" => "%{direct_ip}"}
                            remove_field => "message"
                            remove_field => "prospector"
                            remove_field => "beat"
                            remove_field => "host"
                            remove_field => "input"
                            remove_field => "source"
                            remove_field => "offset"
                            remove_field => "fields"
                            remove_field => "@version"
                    }
                    date {
                            match => [ "timestamp","yyyy-MM-dd HH:mm:ss Z" ]
                    }
    mutate {
                            split => ["client_ip",","]
                    }
                    mutate {
                            convert => ["body_bytes_sent","integer"]
                            convert => ["total_bytes_sent","integer"]
                    }
                    mutate {
                            replace => { "client_ip" => "%{client_ip[0]}"}
                    }
                    if [client_ip] == "-" {
                            if [@metadata][direct_ip] not in ["%{direct_ip}","-"]{
                                    mutate {
                                            replace => { "client_ip" => "%{direct_ip}" }
                                    }
                            } else {
                                    drop {}
                            }
                    }
                    geoip {
                            source => "client_ip"
                            target => ["geoip"]
                            add_field => ["[geoip][coordinates]", "%{[geoip][longitude]}"]
                            add_field => ["[geoip][coordinates]", "%{[geoip][latitude]}"]
                    }
                    mutate {
                            convert => ["[geoip][coordinates]","float"]
                    }
                    mutate {
                            remove_field => ["direct_ip"]
                            remove_field => ["timestamp"]
                    }
            }
    }
    output{
            if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] and "_geoip_lookup_failure" not in [tags] {
               jdbc{
                  connection_string => "jdbc:mysql://172.16.0.57:3306/elk?user=chaofeng&password=123456&useUnicode=true&characterEncoding=UTF8"
                  statement => ["INSERT INTO elklog(client_ip)VALUES(?)","client_ip"]
               }    
               stdout{
                    codec => rubydebug
               }
    }

    5)在mysql中创建数据库和表以及表结构

    创建的数据库是:elk

    创建的表是:elklog

    表结构如下:

    MariaDB [elk]> desc elklog;
    +-----------+-------------+------+-----+---------+-------+
    | Field     | Type        | Null | Key | Default | Extra |
    +-----------+-------------+------+-----+---------+-------+
    | client_ip | varchar(20) | YES  |     | NULL    |       |
    +-----------+-------------+------+-----+---------+-------+
    1 row in set (0.01 sec)

    (6)效果图

    难点:tomcat中的时间最好也进行自定义格式化,不然不好整

  • 相关阅读:
    彻底弄懂CSS盒子模式
    什么是 MDAC、 DA SDK、 ODBC、 OLEDB、 ADO、 RDS, 和 ADO / MD
    多线程编程之一——问题提出
    DECLARE_HANDLE
    15.3简单多线程示例
    为什么有的时候Win32 Console Application新创建的子线程得不到运行
    多线程CreateThread函数的用法及注意事项
    多线程编程之二——MFC中的多线程开发
    listbox 使用笔记
    Command 对象
  • 原文地址:https://www.cnblogs.com/FengGeBlog/p/10557794.html
Copyright © 2011-2022 走看看