zoukankan      html  css  js  c++  java
  • ELK 日志分析实例

    ELK 日志分析实例
    一、ELK-web日志分析
    二、ELK-MySQL 慢查询日志分析
    三、ELK-SSH登陆日志分析
    四、ELK-vsftpd 日志分析

    一、ELK-web日志分析

    通过logstash grok正则将web日志过滤出来,输出到Elasticsearch 搜索引擎里,通过Kibana前端展示。

     1.png

    1.1、创建logstash grok 过滤规则

    #cat ./logstahs/patterns/nginx

    NGINXACCESS %{IPORHOST:remote_addr} – – [%{HTTPDATE:time_local}] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{INT:status} %{INT:body_bytes_sent} %{QS:http_referer} %{QS:http_user_agent}

    1.2、创建logstash web日志配置文件

    #cat ./logstash/conf/ngx_log.conf

    input {
            file {
                    type => "nginx_log"
                    path => "/opt/nginx/logs/access.log"
            }
    }  
    filter {
      if [type] == "nginx_log" {
        grok {
          match => { "message" => "%{NGINXACCESS}" }
        }
        geoip {
          source => "remote_addr"
          target => "geoip"
          database => "/opt/logstash-2.0.0/conf/GeoLiteCity.dat"
          add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
          add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
        }
        mutate {
          convert => [ "[geoip][coordinates]","float", "body_bytes_sent","float", 
              "body_bytes_sent.raw","float"]
        }
      }
    }
    
    output {
        stdout { codec => rubydebug }
        elasticsearch {
            hosts => "elk.test.com:9200"
            index => "ngx_log-%{+YYYY.MM}"
        }
    }

    1.3、创建Kibana图形

    统计httpcode状态码

    选择【Visualize】菜单,选择 【Pie chart】选项。字段选择status.raw,如下图所示:

    2.png

    统计访问前50 IP

    选择【Visualize】菜单,选择 【Vertical bar chart】选项。字段选择remote_addr.raw,如下图所示:

    3.png

    统计 403-405 状态码

    选择【Visualize】菜单,选择 【Line chart】选项。字段选择status.raw,如下图所示:

    4.png

    其它图形统计,就不详细举例了。

    详细图形展示如下:

    5.png

    6.png

    7.png

    二、ELK-MySQL 慢查询日志分析

    2.1、创建logstash grok 过滤规则

    #cat ./logstahs/patterns/mysql_slow

    MYSQLSLOW "# User@Host: %{WORD:user}[%{WORD}] @ (%{HOST:client_hostname}|) [(%{IP:client_ip}|)]",
    "# Thread_id: %{NUMBER:thread_id:int} s*Schema: (%{WORD:schema}| ) s*Last_errno: 
    %{NUMBER:last_errno:int} s*Killed: %{NUMBER:killed:int}",
    "# Query_time: %{NUMBER:query_time:float} s*Lock_time: %{NUMBER:lock_time:float} 
    s*Rows_sent: %{NUMBER:rows_sent:int} s*Rows_examined: %{NUMBER:rows_examined:int}",
    "# Bytes_sent: %{NUMBER:bytes_sent:int}",
    "(?m)SET timestamp=%{NUMBER:timestamp};%{GREEDYDATA:mysql_query}"

    2.2、创建logstash MySQL-Slow慢查询配置文件

    #cat ./logstash/conf/MySQL-Slow.conf

    input {
      file {
        type => "mysql-slow"
        path => "/var/log/mysql_slow_log.log"  
      }
    }  
    filter {
    if [type] == "mysql-slow" {  
    multiline {
        pattern => "^#|^SET"
        negate => true
        what => "previous"
    }  
    grok {
        match => { "message" => "%{MYSQLSLOW}"  }
    }
    mutate {
             gsub => [ "mysql_query", "
    ", " " ]
             gsub => [ "mysql_query", "  ", " " ]
             add_tag => "mutated_mysql_query"
    }
    multiline {
        pattern => "(# User|# Thread|# Query|# Time|# Bytes)"
        negate => false
        what => "next"
    }
    date {
        match => [ "timestamp","UNIX" ]
    }
    mutate {
        remove_field => [ "timestamp" ]
    }
    }
    }  
    output {
        stdout { codec => rubydebug }
        elasticsearch {
            hosts => "elk.test.com:9200"
            index => "mysql_slow_log-%{+YYYY.MM}"
        }
    }

    2.3、详细图形展示如下:

    8.png

    三、ELK-SSH登陆日志分析

    3.1、创建logstash grok 过滤规则

    #cat ./logstahs/patterns/ssh

    SECURELOG %{WORD:program}[%{DATA:pid}]: %{WORD:status} password for ?(invalid user)? %{WORD:USER} from %{DATA:IP} port

    SYSLOGPAMSESSION %{SYSLOGBASE} (?=%{GREEDYDATA:message})%{WORD:pam_module}(%{DATA:pam_caller}): session %{WORD:pam_session_state} for user %{USERNAME:username}(?: by %{GREEDYDATA:pam_by})?

    SYSLOGBASE2 (?:%{SYSLOGTIMESTAMP:timestamp}|%{TIMESTAMP_ISO8601:timestamp8601}) (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:

     

    3.2、创建logstash ssh配置文件

    #cat ./logstash/conf/ssh.conf

    input {
        file {
            type => "seclog"
            path => "/var/log/secure"
       }
    }
    filter {
    if [type] == "seclog" {
        grok {
            match => { "message" => "%{SYSLOGPAMSESSION}" }
            match => { "message" => "%{SECURELOG}" }
            match => { "message" => "%{SYSLOGBASE2}" }
        }
        geoip {
            source => "IP"
            fields => ["city_name"]
            database => "/opt/logstash-2.0.0/conf/GeoLiteCity.dat"
        }
        if ([status] == "Accepted") {
            mutate {
            add_tag => ["Success"]
            }
        }
        else if ([status] == "Failed") {
            mutate {
            add_tag => ["Failed"]
            }
        }
    }
    output {
        stdout { codec => rubydebug }
        elasticsearch {
            hosts => "elk.test.com:9200"
            index => "sshd_log-%{+YYYY.MM}"
        }
    }

    PS:添加状态标签,便于Kibana 统计

    if ([status] == "Accepted") {              #判断字段[status]值,匹配[Accepted]
            mutate {
            add_tag => ["Success"]      #添加标签[Success]
            }
    }
    else if ([status] == "Failed") {           #判断字段[status]值,匹配[Failed]
            mutate {
            add_tag => ["Failed"]       #添加标签[Failed]
            }
    }

     

    3.3、详细图形展示如下:

    9.png

    四、ELK-vsftpd 日志分析

    4.1、创建logstash grok 过滤规则

    #cat ./logstahs/patterns/vsftpd

    VSFTPDCONNECT [pid %{WORD:pid}] %{WORD:action}: Client "%{DATA:IP}"
    VSFTPDLOGIN [pid %{WORD:pid}] [%{WORD:user}] %{WORD:status} %{WORD:action}: Client "%{DATA:IP}"VSFTPDACTION [pid %{DATA:pid}] [%{DATA:user}] %{WORD:status} %{WORD:action}: Client "%{DATA:IP}", "%{DATA:file}", %{DATA:bytes} bytes, %{DATA:Kbyte_sec}Kbyte/sec 

    4.2、创建logstash vsftpd配置文件

    #cat ./logstash/conf/vsftpd.conf

    input {
        file {
            type => "vsftpd_log"
            path => "/var/log/vsftpd.log"
        }
    }
    filter {
        if [type] == "vsftpd_log" {
            grok {
                match => { "message" => "%{VSFTPDACTION}" }
                match => { "message" => "%{VSFTPDLOGIN}" }
                match => { "message" => "%{VSFTPDCONNECT}" }
            }
        }
    }
    output {
        stdout { codec => rubydebug }
        elasticsearch {
            hosts => "elk.test.com:9200"
            index => "vsftpd_log-%{+YYYY.MM}"
        }
    }

    4.3、详细图形展示如下:

    10.png

    原创文章,作者:wubin,如若转载,请注明出处:http://www.178linux.com/17395

  • 相关阅读:
    CS224n, lec 10, NMT & Seq2Seq Attn
    CS231n笔记 Lecture 11, Detection and Segmentation
    CS231n笔记 Lecture 10, Recurrent Neural Networks
    CS231n笔记 Lecture 9, CNN Architectures
    CS231n笔记 Lecture 8, Deep Learning Software
    CS231n笔记 Lecture 7, Training Neural Networks, Part 2
    pytorch坑点排雷
    Sorry, Ubuntu 17.10 has experienced an internal error
    VSCode配置python插件
    tmux配置与使用
  • 原文地址:https://www.cnblogs.com/cheyunhua/p/8618315.html
Copyright © 2011-2022 走看看