zoukankan      html  css  js  c++  java
  • ELK 日志系统搭建配置

      logstash是一个数据分析软件,主要目的是分析log日志。整一套软件可以当作一个MVC模型,logstash是controller层,Elasticsearch是一个model层,kibana是view层。

          首先将数据传给logstash,它将数据进行过滤和格式化(转成JSON格式),然后传给Elasticsearch进行存储、建搜索的索引,kibana提供前端的页面再进行搜索和图表可视化,它是调用Elasticsearch的接口返回的数据进行可视化。logstash和Elasticsearch是用Java写的,kibana使用node.js框架。

          这个软件官网有很详细的使用说明,https://www.elastic.co/,除了docs之外,还有视频教程。这篇博客集合了docs和视频里面一些比较重要的设置和使用。

    一、logstash的配置

    1. 定义数据源

          写一个配置文件,可命名为logstash.conf,输入以下内容:

    input {
            file {
                    path => "/data/web/logstash/logFile/*/*"
                    start_position => "beginning" #从文件开始处读写
            }
    #       stdin {}  #可以从标准输入读数据
    }

          定义的数据源,支持从文件、stdin、kafka、twitter等来源,甚至可以自己写一个input plugin。如果像上面那样用通配符写file,如果有新日志文件拷进来,它会自动去扫描。

    2. 定义数据的格式

          根据打日志的格式,用正则表达式进行匹配

    filter {
    
      #定义数据的格式
      grok {
        match => { "message" => "%{DATA:timestamp}|%{IP:serverIp}|%{IP:clientIp}|%{DATA:logSource}|%{DATA:userId}|%{DATA:reqUrl}|%{DATA:reqUri}|%{DATA:refer}|%{DATA:device}|%{DATA:textDuring}|%{DATA:duringTime:int}||"}
      }
    
    }

          由于打日志的格式是这样的:

    2015-05-07-16:03:04|10.4.29.158|120.131.74.116|WEB|11299073|http://quxue.renren.com/shareApp?isappinstalled=0&userId=11299073&from=groupmessage|/shareApp|null|Mozilla/5.0 (iPhone; CPU iPhone OS 8_2 like Mac OS X) AppleWebKit/600.1.4 (KHTML, like Gecko) Mobile/12D508 MicroMessenger/6.1.5 NetType/WIFI|duringTime|98||

          以|符号隔开,第一个是访问时间,timestamp,作为logstash的时间戳,接下来的依次为:服务端IP,客户端的IP,机器类型(WEB/APP/ADMIN),用户的ID(没有用0表示),请求的完整网址,请求的控制器路径,reference,设备的信息,duringTime,请求所花的时间。

         如上面代码,依次定义字段,用一个正则表达式进行匹配,DATA是logstash定义好的正则,其实就是(.*?),并且定义字段名。

         我们将访问时间作为logstash的时间戳,有了这个,我们就可以以时间为区分,查看分析某段时间的请求是怎样的,如果没有匹配到这个时间的话,logstash将以当前时间作为该条记录的时间戳。需要再filter里面定义时间戳的格式,即打日志用的格式:

    filter {
    
      #定义数据的格式
      grok {#同上... }
    
      #定义时间戳的格式
      date {
        match => [ "timestamp", "yyyy-MM-dd-HH:mm:ss" ]
        locale => "cn"
      }
    
    }

         在上面的字段里面需要跟logstash指出哪个是客户端IP,logstash会自动去抓取该IP的相关位置信息:

    filter {
    
      #定义数据的格式
      grok {#同上}
    
      #定义时间戳的格式
      date {#同上}
    
      #定义客户端的IP是哪个字段(上面定义的数据格式)
      geoip {
        source => "clientIp"
      }
    }

         同样地还有客户端的UA,由于UA的格式比较多,logstash也会自动去分析,提取操作系统等相关信息

      #定义客户端设备是哪一个字段
      useragent {
        source => "device"
        target => "userDevice"
      }

          哪些字段是整型的,也需要告诉logstash,为了后面分析时可进行排序,使用的数据里面只有一个时间

      #需要进行转换的字段,这里是将访问的时间转成int,再传给Elasticsearch
      mutate {
        convert => ["duringTime", "integer"]
      }

    3. 输出配置

          最后就是输出的配置,将过滤扣的数据输出到elasticsearch

    output {
      #将输出保存到elasticsearch,如果没有匹配到时间就不保存,因为日志里的网址参数有些带有换行
      if [timestamp] =~ /^d{4}-d{2}-d{2}/ {
            elasticsearch { host => localhost }
      }
    
       #输出到stdout
    #  stdout { codec => rubydebug }
    
       #定义访问数据的用户名和密码
    #  user => webService
    #  password => 1q2w3e4r
    }

          我们将上述配置,保存到logstash.conf,然后运行logstash

          在logstash启动完成之后,输入上面的那条访问记录,logstash将输出过滤后的数据:

         可以看到logstash,自动去查询IP的归属地,并将请求里面的device字段进行分析。

    二、配置Elasticsearch和kibana

    1. Elasticsearch

          这个不需要怎么配,使用默认的配置即可。配置是: config/elasticsearch.yml

          如果需要设置数据的过期时间,可以加上这两行(目测是这样配的,没有验证过,读者可以试一下):

    #设置为30天过期
    
    indices.cache.filter.expire: 30d
    
    index.cache.filter: 30d

          Elasticsearch默认监听在9200端口,可对其进行查询和管理,例如看索引的健康状态:

    curl 'localhost:9200/_cluster/health?level=indices&pretty'

          输出

    {
      "cluster_name" : "elasticsearch",
      "status" : "yellow",
      "timed_out" : false,
      "number_of_nodes" : 2,
      "number_of_data_nodes" : 1,
      "active_primary_shards" : 161,
      "active_shards" : 161,
      "relocating_shards" : 0,
      "initializing_shards" : 0,
      "unassigned_shards" : 161,
      "number_of_pending_tasks" : 0,
      "indices" : {
        "logstash-2015.05.05" : {
          "status" : "yellow", #有三级,green, yellow和red
          "number_of_shards" : 5,
          "number_of_replicas" : 1,
          "active_primary_shards" : 5,
          "active_shards" : 5,
          "relocating_shards" : 0,
          "initializing_shards" : 0,
          "unassigned_shards" : 5
        }
    }

          可在浏览器进行访问,例如查询一下使用chrome浏览器情况:

    2. kibana

          这个也不用配置,默认监听在5601端口。

    #让它运行在后台
    localhost# nohup bin/kibana &

          注意以上两个要使用Java 7以上版本,还有小版本要求,下一个最新的Java 8即可,然后在启动脚本里export 

  • 相关阅读:
    iptables详解
    Python中的Subprocess模块
    Logging模块
    python inspect.stack() 的简单使用
    python之inspect模块
    python之platform模块
    GlusterFS分布式存储学习笔记
    AD 域服务简介(一)- 基于 LDAP 的 AD 域服务器搭建及其使用
    LDAP概念和原理介绍
    文件传输协议FTP、SFTP和SCP
  • 原文地址:https://www.cnblogs.com/daxiongblog/p/5912708.html
Copyright © 2011-2022 走看看