zoukankan      html  css  js  c++  java
  • ELK日志收集分析系统配置

    ELK是日志收益与分析的利器。

    1、elasticsearch集群搭建

    2、logstash日志收集

    我这里的实现分如下2步,中间用redis队列做缓冲,可以有效的避免es压力过大:

    1、n个agent对n个服务的log做日志收集(1对1的方式),从日志文件解析数据,存入broker,这里用的是redis的发布订阅模式的消息队列,当然你可以选用kafka,redis比较方便;

    2、indexer做日志汇总,从redis队列中拿数据入es;

    下面给出agent和index的配置示例:

    1、driver_schedule.conf

    input {
      file {
          #这是日志路径
          path => [
              "/home/xiaoju/driver-schedule-api/logs/driver-schedule-api.info.*",
              "/home/xiaoju/driver-schedule-api/logs/driver-schedule-api.error.*"
            ]
          #排除路径,支持glob展开,但是不递归
          exclude => [
              "access.*"
          ]
          #开始位置,beginning从日志开始读取
          start_position => "beginning"
          #sincedb指示的文件,记录日志读取位置
          sincedb_path => "/home/xiaoju/yangfan/local/logstash-1.4.2/sincedb/driver_schedule_progress"
          #添加记录字段
          add_field => {
              "server" => "driver_schedule"
          }
          #编码器,正则pattern多行合并
          codec => multiline {
              pattern => "^d+:d+"
              negate => true
              what => "previous"
          }
      }
    }
    
    filter {
        #匹配路径中包涵info
        if [path] =~ "info" {
            #mutate更改值
            mutate {
                replace => { "type" => "info" }
            }
            grok {
                match => { "message" => "%{COMBINEDAPACHELOG}" }
            }
        }else if [path] =~ "error" {
            mutate {
                replace => { "type" => "error" }
            }
        } else {
            mutate { replace => { "type" => "unknow" } }
        }
    
        date {
            match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
        }
    }
    
    output {
        #debug格式化打印
        #stdout{codec => rubydebug}
        redis {
            host => "10.94.99.55"
            #这里用的是redis订阅模式,对应indexer的data_type应是pattern_channel
            data_type => "channel"
            port => 6379
            db => 5
            key => "logstash:%{[server]}"
        }
    }

    run起来:

    nohup ./bin/logstash -f ./conf/agent/driver_schedule.conf &

    2、indexer.conf

    input {
        redis {
            host => "10.94.99.55"
            port => 6379
            db => 5
            #如果这里选择了pattern_channel, 采用的是redis的订阅方式, agent里data_type就要对应channel
            data_type => "pattern_channel"
            #这是所有的key的匹配pattern
            key => "logstash:*"
        }
    }
    
    output {
        elasticsearch {
            embedded => false
            protocol => "http"
            host => "10.94.99.56"
            port => 9211
            #配置es索引名字
            index => "%{[server]}"
            #配置es索引类型
            index_type => "%{[type]}"
        }
        #debug使用, 格式化打印
        #stdout{codec => rubydebug}
    }

    run起来:

    nohup ./bin/logstash -f ./conf/indexer/indexer.conf &

    3、kibana配置

    网上教程比较多,这里我只mark一些问题的解决方法:

    1、connection failure: 

    checklist:

    1、配置kibana的config.js里的es地址

    2、如果es版本>1.4则需要在es的配置里加入

    http.cors.allow-origin: "/.*/"

    http.cors.enabled: true

     

    注意事项:

    1、ES和logstash最好选用相同大版本,不然可能写不进去

    2、logstash会写一个syncsys的文件,记录上次读取文件到什么地方

  • 相关阅读:
    LeetCode 139. Word Break
    Amazon behavior question
    学习笔记之100 TOP Ikm C++ Online Test Questions
    学习笔记之IKM C++ 11
    学习笔记之C/C++指针使用常见的坑
    LeetCode 208. Implement Trie (Prefix Tree)
    队列 & 栈//岛屿的个数
    队列 & 栈//设计循环队列
    队列 & 栈//设计循环队列
    查找表类算法//存在重复元素 III
  • 原文地址:https://www.cnblogs.com/kangoroo/p/6053828.html
Copyright © 2011-2022 走看看