zoukankan      html  css  js  c++  java
  • ELK补充之logstash

    ELK补充之logstash

    通过logtsash收集tomcat和java日志

    server.xml
    <Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
                   prefix="tomcat_access_log" suffix=".log"
                   pattern="{&quot;clientip&quot;:&quot;%h&quot;,&quot;ClientUser&quot;:&quot;%l&quot;,&quot;authenticated&quot;:&quot;%u&quot;,&quot;AccessTime&quot;:&quot;%t&quot;,&quot;method&quot;:&quot;%r&quot;,&quot;status&quot;:&quot;%s&quot;,&quot;SendBytes&quot;:&quot;%b&quot;,&quot;Query?string&quot;:&quot;%q&quot;,&quot;partner&quot;:&quot;%{Referer}i&quot;,&quot;AgentVersion&quot;:&quot;%{User-Agent}i&quot;}"/>
    
    /etc/init.d/tomcat stop
    rm -rf /apps/tomcat/logs/*
    /etc/init.d/tomcat start
    tail -f tomcat_access_log.2019-08-19.log 
    {"clientip":"192.168.10.1","ClientUser":"-","authenticated":"-","AccessTime":"[19/Aug/2019:01:42:03 +0000]","method":"GET /testapp/ HTTP/1.1","status":"304","SendBytes":"-","Query?string":"","partner":"-","AgentVersion":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36"}

    验证日志是否json格式

    https://www.json.cn/

    如何获取日志行中的IP

    [root@tomcat1 ~]# cat 1
    #!/usr/bin/env python
    #coding:utf-8
    
    data={"clientip":"192.168.10.1","ClientUser":"-","authenticated":"-","AccessTime":"[19/Aug/2019:01:42:03 +0000]","method":"GET /testapp/ HTTP/1.1","status":"304","SendBytes":"-","Query?string":"","partner":"-","AgentVersion":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36"}
    ip=data["clientip"]
    print ip
    [root@tomcat1 ~]# python 1
    192.168.10.1

    在tomcat服务器安装logstash收集tomcat和系统日志

    需要部署tomcat并安装配置logstash

    vim /etc/logstash/conf.d/tomcat-es.conf
    input{
      file{
        path => "/apps/tomcat/logs/tomcat_access_log.*.log"
        type => "tomcat-accesslog"
        start_position => "beginning"
        stat_interval => "3"
        codec => "json"
      }
    }
    
    output{
      if [type] == "tomcat_access_log" {
        elasticsearch {
          hosts => ["192.168.10.100:9200"]
          index => "192.168.10.230-tomcat-accesslog-%{+YYYY.MM.dd}"
        }
      }
    }
    /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/tomcat-es.conf
    systemctl start logstash
    
    [root@tomcat1 conf.d]# systemctl status logstash
    ● logstash.service - logstash
       Loaded: loaded (/etc/systemd/system/logstash.service; disabled; vendor preset: disabled)
       Active: failed (Result: start-limit) since Mon 2019-08-19 10:31:22 CST; 3min 4s ago
      Process: 1292 ExecStart=/usr/share/logstash/bin/logstash --path.settings /etc/logstash (code=exited, status=1/FAILURE)
     Main PID: 1292 (code=exited, status=1/FAILURE)
    
    Aug 19 10:31:22 tomcat1 systemd[1]: Unit logstash.service entered failed state.
    Aug 19 10:31:22 tomcat1 systemd[1]: logstash.service failed.
    Aug 19 10:31:22 tomcat1 systemd[1]: logstash.service holdoff time over, scheduling restart.
    Aug 19 10:31:22 tomcat1 systemd[1]: Stopped logstash.
    Aug 19 10:31:22 tomcat1 systemd[1]: start request repeated too quickly for logstash.service
    Aug 19 10:31:22 tomcat1 systemd[1]: Failed to start logstash.
    Aug 19 10:31:22 tomcat1 systemd[1]: Unit logstash.service entered failed state.
    Aug 19 10:31:22 tomcat1 systemd[1]: logstash.service failed.
    
    vim /var/log/messages
    could not find java; set JAVA_HOME or ensure java is in PATH
    
    ln /apps/jdk/bin/java /usr/bin/java -sv
    
    [root@tomcat1 conf.d]# systemctl status logstash
    ● logstash.service - logstash
       Loaded: loaded (/etc/systemd/system/logstash.service; disabled; vendor preset: disabled)
       Active: active (running) since Mon 2019-08-19 10:34:30 CST; 3s ago
     Main PID: 1311 (java)
       CGroup: /user.slice/user-0.slice/session-1.scope/system.slice/logstash.service
               └─1311 /bin/java -Xms300m -Xmx300m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:C...
    
    Aug 19 10:34:30 tomcat1 systemd[1]: Started logstash.

    创建一个索引

    logstash
    logstash

    logstash
    logstash

    logstash
    logstash

    收集java日志

    https://www.elastic.co/guide/en/logstash/current/plugins-codecs-multiline.html

    在elasticsearch服务器部署logstash

    cat /etc/logstash/conf.d/java-es.conf
    input {
            stdin {
            codec => multiline {
            pattern => "^[" #当遇到[开头的行时候将多行进行合并
            negate => true  #true为匹配成功进行操作,false为不成功进行操作
            what => "previous"  #与上面的行合并,如果是下面的行合并就是next
            }}
    }
    filter { #日志过滤,如果所有的日志都过滤就写这里,如果只针对某一个过滤就写在input里面的日志输入里面
    }
    output {
            stdout {
            codec => rubydebug
      }
    }
    

    测试是否正常启动

    /usr/share/logstash/bin/logstash -f java-es.conf 
    WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
    Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
    [WARN ] 2019-08-19 05:56:58.580 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
    [INFO ] 2019-08-19 05:56:58.604 [LogStash::Runner] runner - Starting Logstash {"logstash.version"=>"6.5.4"}
    [INFO ] 2019-08-19 05:57:04.238 [Converge PipelineAction::Create<main>] pipeline - Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
    [INFO ] 2019-08-19 05:57:04.875 [Converge PipelineAction::Create<main>] pipeline - Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x44c5ad22 run>"}
    The stdin plugin is now waiting for input:
    [INFO ] 2019-08-19 05:57:05.267 [Ruby-0-Thread-1: /usr/share/logstash/lib/bootstrap/environment.rb:6] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
    [INFO ] 2019-08-19 05:57:05.689 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9601}
    
    测试标准输入和标准输出
    输出
    1234
    1234[1234]
    [233
    输出
    {
          "@version" => "1",
              "tags" => [
            [0] "multiline"
        ],
              "host" => "tomcat1",
        "@timestamp" => 2019-08-19T05:58:13.616Z,
           "message" => "1234
    1234[1234]"
    }
    
    5678[90]12
    [333[444]66
    {
          "@version" => "1",
              "tags" => [
            [0] "multiline"
        ],
              "host" => "tomcat1",
        "@timestamp" => 2019-08-19T05:59:08.507Z,
           "message" => "[233
    5678[90]12"
    }
    
    [
    {
          "@version" => "1",
              "host" => "tomcat1",
        "@timestamp" => 2019-08-19T05:59:42.467Z,
           "message" => "[333[444]66"
    }

    将输出改为elasticsearch

    cat java-es.conf 
    input {
      file {
        path => "/tmp/cluster-e.log"
        type => "javalog"
        start_position => "beginning"
        codec => multiline {
        pattern => "^["
        negate => true
        what => "previous"
      }}
    }
    
    output {
      if [type] == "javalog" {
      elasticsearch {
        hosts =>  ["192.168.10.100:9200"]
        index => "192.168.10.230-javalog-%{+YYYY.MM.dd}"
      }}
    }

    添加索引

    logstash
    logstash

    logstash
    logstash

    生成数据

    cp cluster-e.log 1
    cat 1 >> cluster-e.log

    kibana界面查看数据

    kibana
    kibana

    收集TCP/UDP日志

    通过logstash的tcp/udp插件收集日志,通常用于在向elasticsearch日志补录丢失的部分日志,可以将丢失的日志通过一个TCP端口直接写入到elasticsearch服务器。

    logstash配置文件,先进行收集测试

    cat tcp-es.conf 
    input {
      tcp {
        port => 2333
        type => "tcplog"
        mode => "server"  
      }
    }
    
    output {
      stdout {
        codec => "rubydebug"
      }
    }

    验证端口启动成功

    /usr/share/logstash/bin/logstash -f  /etc/logstash/conf.d/tcp.conf
    ss -tnl|grep 2333

    在其他服务器安装nc命令:

    NetCat简称nc,在网络工具中有“瑞士军刀”美誉,其功能实用,是一个简单、可靠的网络工具,可通过TCP或UDP协议传输读写数据,另外还具有很多其他功能。

    yum instll nc –y
    echo "nc test tcplog"|nc 192.168.10.230 2333
    验证logstash是否接收到数据
    {
              "host" => "192.168.10.230",
           "message" => "nc test tcplog",
              "type" => "tcplog",
        "@timestamp" => 2019-08-19T06:28:56.464Z,
              "port" => 48222,
          "@version" => "1"
    }

    通过nc命令发送一个文件

    nc 192.168.10.230 2333 < /etc/passwd
    验证logstash是否接收到数据
    {
              "host" => "192.168.10.230",
           "message" => "ftp:x:14:50:FTP User:/var/ftp:/sbin/nologin",
              "type" => "tcplog",
        "@timestamp" => 2019-08-19T06:30:39.686Z,
              "port" => 48224,
          "@version" => "1"
    }

    将输出改为elasticsearch

    cat tcp-es.conf 
    input {
      tcp {
        port => 2333
        type => "tcplog"
        mode => "server"
      }
    }
    
    output {
      elasticsearch {
        hosts => ["192.168.10.100:9200"]
        index =>  "192.168.10.230-tcplog-%{+YYYY.MM.dd}"
      }
    }
    
    systemctl  restart logstash

    在kibana界面添加索引

    导入数据 nc 192.168.10.230 2333 < /etc/passwd
    kibana

    kibana
    kibana

    验证数据

    kibana
    kibana

    通过rsyslog收集haproxy日志

    在centos 6及之前的版本叫做syslog,centos 7开始叫做rsyslog,根据官方的介绍,rsyslog(2013年版本)可以达到每秒转发百万条日志的级别,官方网址:http://www.rsyslog.com/

    vim /etc/haproxy/haproxy.cfg
    log         127.0.0.1 local2
    listen stats
     mode http
     bind 0.0.0.0:8888
     stats enable
     log global
     stats uri     /hastatus
     stats auth    admin:admin
    vim /etc/rsyslog.conf 
    # Provides UDP syslog reception
    $ModLoad imudp
    $UDPServerRun 514
    
    # Provides TCP syslog reception
    $ModLoad imtcp
    $InputTCPServerRun 514
    
    local2.* @@192.168.10.230:514

    重新启动haproxy和rsyslog服务

    systemctl restart haproxy rsyslog
    vim rsyslog-es.conf
    input{
          syslog {
            type => "rsyslog"
            port => "514"
    }}
    
    output{
      elasticsearch {
        hosts => ["192.168.10.100:9200"]
        index =>  "l92.168.10.230-rsyslog-%{+YYYY.MM.dd}"
      }
    }

    systemctl restart logstash

    建立索引

    kibana
    kibana

    kibana验证数据

    kibana
    kibana

    logstash收集日志并写入redis

    用一台或多台服务器按照部署redis服务,专门用于日志缓存使用,用于web服务器产生大量日志的场景,例如下面的服务器内存即将被使用完毕,查看是因为redis服务保存了大量的数据没有被读取而占用了大量的内存空间。
    日志文件>>logstash>>redis>>logstash>>elasticsearch>>kibana

    安装redis

    yum install redis -y

    配置redis

    grep "^[a-Z]" /etc/redis.conf  
    bind 0.0.0.0
    port 6379
    daemonize yes #后台启动
    save ""
    requirepass password #设置redis访问密码
    rdbcompression no  #是否压缩
    rdbchecksum no  #是否校验

    验证redis

    [root@Final conf.d]# redis-cli 
    127.0.0.1:6379> KEYS *
    (error) NOAUTH Authentication required.
    127.0.0.1:6379> keys *
    (error) NOAUTH Authentication required.
    127.0.0.1:6379> auth password
    OK
    127.0.0.1:6379> keys *
    (empty list or set)
    127.0.0.1:6379> select 1
    OK
    127.0.0.1:6379[1]> keys *
    (empty list or set)
    127.0.0.1:6379[1]> keys *
    1) "redis-nginx-accesslog"

    配置logstash将日志写入至redis

    [root@logstash1 conf.d]# cat redis-es.conf 
    input {
      file {
        path => "/usr/local/nginx/logs/access_json.log"
        type => "nginx-accesslog"
        start_position => "beginning"
        stat_interval => "3"
        codec => "json"
      }
    }
    
    output {
      if [type] == "nginx-accesslog" {
        redis {
          host => "192.168.10.254"
          port => 6379
          password => password
          key => "redis-nginx-accesslog"
          db => 1
          data_type => list
      }}
    }
    
    systemctl restart logstash

    配置其他logstash服务器从redis读取数据

    [root@tomcat1 conf.d]# cat redis-es.conf 
    input {
      redis {
        data_type => "list"
        key => "redis-nginx-accesslog"
        host => "192.168.10.254"
        port => "6379"
        db => "1"
        password => "password"
        codec => "json"
      }
    }
    
    output {
      if [type] == "nginx-accesslog" {
        elasticsearch {
          hosts => ["192.168.10.100:9200"]
          index => "redis-nginx-accesslog-%{+YYYY.MM.dd}"
    }}
    }
    
    systemctl restart logstash

    验证redis是否有数据

    127.0.0.1:6379[1]> keys *
    1) "redis-nginx-accesslog"
    127.0.0.1:6379[1]> LLEN redis-nginx-accesslog     如果值大于0说明logstash服务有问题
    (integer) 2
    127.0.0.1:6379[1]> rpop redis-nginx-accesslog
    "{"status":"sstatus","type":"nginx-accesslog","host":"192.168.10.102","upstreamhost":"-","upstreamtime":"-","xff":"shttp_x_forwarded_for","clientip":"192.168.10.254","@timestamp":"2019-08-20T01:43:46.000Z","aomain":"192.168.10.102","size":6,"path":"/usr/local/nginx/logs/access_json.log","@version":"1","ei":"suri","referer":"-","tcp_xff":"","responsetime":0.0,"http_host":"192.168.10.102","http_user_agent":"http_user_agent"}"
    127.0.0.1:6379[1]> LLEN redis-nginx-accesslog
    (integer) 0
    
    systemctl restart logstash

    添加日志索引

    redis
    redis

    redis
    redis

    验证日志

    redis
    redis

    logstash收集日志并写入kafka

    配置logstash将日志写入至kafka

    [root@logstash1 conf.d]# cat kafka-es.conf 
    input {
      file {
        path => "/usr/local/nginx/logs/access_json.log"
        type => "nginx-accesslog"
        start_position => "beginning"
        stat_interval => "3"
        codec => "json"
      }
    }
    
    output {
      if [type] == "nginx-accesslog" {
       kafka {
        bootstrap_servers => "192.168.10.211:9092"
        topic_id => "kafka-nginx-accesslog"
        codec => "json"
    }}}
    
    systemctl restart logstash

    配置其他logstash服务器从kafka读取数据

    cat kafka-es.conf 
    input {
      kafka {
        bootstrap_servers => "192.168.10.211:9092"
        topics => "kafka-nginx-accesslog"
        codec => "json"
    }
    }
    
    output {
      if [type] == "nginx-accesslog" {
        elasticsearch {
          hosts => ["192.168.10.100:9200"]
          index => "kafka-nginx-accesslog-%{+YYYY.MM.dd}"
          codec => "json"
    }}
    # stdout {
    #  codec => "rubydebug"
    #}
    }
    
    systemctl restart logstash

    生成日志

    while true;do curl http://192.168.10.102;sleep 1;done

    添加日志索引

    kafka
    kafka

    kafka
    kafka

    验证日志

    kafka
    kafka

  • 相关阅读:
    搜狗输入法赏析
    第二次冲刺个人总结05
    程序员的自我修养阅读笔记01
    第十五周学习总结
    mysql 查询优化
    mysql explain 详解
    nginx基本配置与参数说明
    input输入框实现联想关键词功能
    JS图片懒加载
    mysql中timestamp,datetime,int类型的区别与优劣
  • 原文地址:https://www.cnblogs.com/fina/p/11381375.html
Copyright © 2011-2022 走看看