zoukankan      html  css  js  c++  java
  • ELK

    Demo 跑起来之后,就需要根据具体的负载和日志进行优化了,本次主要是优化在 Kibana 界面中 [Table] 展开的 Patterns,过多的 Patterns 有几个负面作用:
    1)、干扰查看信息
    2)、增大索引占用空间
    3)、降低 es 的写入性能

    ELK各组件版本:(Windows Server, Linux 下其他大同小异)
    1)、Filebeat - 7.3.1
    2)、Logstash - 7.3.1
    3)、Elasticsearch: 7.3.1
    4)、Kibana - 7.3.1

    以下是优化前的内容,实际需要的仅仅是6个字段,但此处却有35个字段之多,主要来源是 filebeat 和 logstash 处理时应用了 es 默认模板产生的大量冗余字段。
    这里的结果是已经在 logstash 中初步配置了 remove_field,将 "@version"、"message" 、"log_create_time"  移除了,不然整个 Table 展开后更加的臃肿。
    @version 和 message 是 es 的动态模板自动创建的,log_create_time 是自定义的字段,用于替换默认模板的 @timestamp

               

    Kibana -> Setting -> Index Patterns

    日志样例:

    复制代码
    2019-11-21 08:45:39.656 | ^Warn||ThreadID: 35|Audilog not defined in config.
    2019-11-21 08:47:39.012 | *Error|SQLServer||ThreadID: 35|Database connection refused.
    2019-11-21 09:42:50.156 | Info|Kafka||ThreadID: 35|Reviced Message:
    ID:                                  1
    Mode:                                Delay
    Message Id:                          1
    Map:                                 len=45
    Binary Version:                      3.4.6
    复制代码

     以 “|” 为分隔符,将日志分为 6 个 field,可以看到有的只有5段,有的不在一行:
    field1 - logtime
    field2 - level
    field3 - comp // 此字段可能为空
    field4 - blank // 此处都是 null
    field5 - threadId
    field6 - logbody

    1、先看一下优化后的配置文件

    Filebeat 的配置文件,如果要收集一台服务器上的不同程序的日志,可以新建多个配置文件并配置不同的 logstash 端口,再启动多个进程收集。

    如果是输出到 kafka,则可以在一份配置里配置多个 intput 利用 filed 字段在 output 里送往不同的 topic,而不必启动多个进程。

    • filebeat.yml
    复制代码
    # log files input
    filebeat.inputs:
    - type:
      enabled: true
      paths:
        - F:payserverlog*.log
      multiline.pattern: '[0-9]{4}-[0-9]{2}-[0-9]{2]' // 将不在一行的日志,拼接到以日期开头的行后
      multiline.negate:  true
      multiline.match: after
      fields:
        nginx: payserver
      scan_frequency: 10s
      max_bytes: 1048576
      tail_files: false
      backoff: 1s
      backoff_factor: 2
    
    # output to logstash
    output.logstash:
      hosts: ["172.16.0.11:5146"]
    
    processors:
    - drop_fields:
        fields: ["input_type", "log.offset", "host.name", "input.type", "agent.hostname", "agent.type", "ecs.version", "agent.ephemeral_id", "agent.id", "agent.version", "fields.ics", "log.file.path", "log.flags" ]
    
    monitoring:
      enabled: true
    elasticsearch: ["http://172.16.0.11:9200"]
    # logging logging.level: info logging.to_file: true logging.files: path: E:ELKfilebeat-7.3.1-windows-x86_64logs name: filebeat-5146.log interval: 1h keepfiles: 7 logging.json: false
    复制代码

    Logstash 的配置文件

    当单机有多个程序日志需要收集并且输出端是 Logstash 时,有几种方式:
    1)、使用 if [filed] = 'xxx' 来区分 grok 和 output,此种方式 Logstash 需要做大量的 if 判断,官方称之为 Conditional Hell (条件地狱),会严重降低 grok 效率 - 不推荐
    2)、启动多个 Logstash 实例,需要配置和管理多个 JVM - 不推荐
    3)、使用 Logstash 的 Pipeline , 不必管理多个 JVM,也不必做大量 if 判断,以下的配置使用的就是该种方式

    •  Configlogstash.yml
    复制代码
    pipeline.worker: 10
    pipeline.batch.size: 3000
    pipeline.batch.delay: 10
    
    http.host: "172.16.0.11"
    xpack.monitoring.enabled: true
    xpack.monitoring.elasticsearch.hosts: ["http://172.16.0.11:9200"]
    
    log.level: info
    复制代码
    • Configpipeline.yml
    复制代码
    - pipeline: main
    pipeline.workers: 8
    pipeline.batch.size: 3000
    pipeline.delay: 200 path.config: E:\ELK\logstash-7.3.1\pipeline\5044-server01-payapi.conf - pipeline: server-01-error
    pipeline.workers: 8
    pipeline.batch.size: 3000
    pipeline.delay: 200 path.config: E:\ELK\logstash-7.3.1\pipeline\5045-server01-payweb.conf - pipeline: server-02-access
    pipeline.workers: 8
    pipeline.batch.size: 3000
      pipeline.delay: 200 path.config: E:\ELK\logstash-7.3.1\pipeline\5046-server02-payapi.conf - pipeline: server-02-error
    pipeline.workers: 8
    pipeline.batch.size: 3000
    pipeline.delay: 200
    path.config: E:\ELK\logstash-7.3.1\pipeline\5047-server02-payweb.conf
    复制代码
    • Pipeline5044-server01-payapi.conf
    复制代码
    input {
      beats {
        port => 5044 // 当配置了Pipeline时,不同的 pipeline 配置文件,此端口不可冲突
         client_inactivity_timeout => 600
      }   
    }
    
    filter {
      grok {
        match => {
          "message" => "%{DATA:logtime}|%{DATA:level}|%{DATA:comp}|%{DATA:blank}|%{DATA:threadId}|%{GREEDYDATA:logdoby}"
         }
      }
    
      if "_grokparsefailure" in [tags] { // 某些行按 "|" 分割只有5段,按前面的 grok 会解析失败,并生成一个值为 "_grokparsefailure" 的 tag,此处重新解析失败的行
        grok {
          match => {
            "message" => "%{DATA:logtime}|%{DATA:level}|%{DATA:blank}|%{DATA:threadId}|%{GREEDYDATA:logdoby}"
          }
        }
      }

    grok {
      match => {
    "message" => "%{TIMESTAMP_ISO8601:log_create_time}" // 将日志的时间按照 TIMESTAMP_ISO8601 解析给临时变量 log_create_time
    }
    }

    date {
    match => ["log_create_time", "yyyy-MM-dd HH:mm:ss.SSS"] // 按时间格式匹配一下
    target => "@timestamp" // 将 log_create_time 写入 @timestamp
    }
    mutate {
    remove_field => "@version"
    remove_field => "message"
    remove_field => "log_create_time"
    remove-field => "tags"
    gsub => ["level", "s", ""] // 移除字段中的空格
    gsub => ["comp", "s", ""]  
    }
    }

    output {
      elasticsearch {
        hosts => ["http://172.16.0.11:9200"]
        index => "payapi-server01-%{+yyyy.MM.dd}"
        manage_template => false // 取消 logstash 自动管理模板功能
        template_name => template_payapi // 映射自定义模板的名字,自定义模板的创建在下方
      }
    }
    复制代码

    其他的几个配置文件类似,此处注意几点:

    1)、@timestamp 默认是 logstash 处理日志时的时间,当日志的生成时间和 logstash 的处理时间较为接近时问题不大;但假如你要索引几个月前的文档或者日志,此时这个时间差几乎就不能接受;所以此处新建了一个临时变量 log_create_time,再使用 date 插件,将其写入 @timestamp,最后@timestamp 就等于日志的生成时间了,在这个例子里,应该明白 @timestamp、logtime、log_create_time 三者是相同的。

    2)、没有使用 logstash 默认的索引模板,使用的是自定义的索引模板,在 Kibana 的 Console 中新建模板:

    复制代码
    PUT /_template/template_payapi
    {
      "index_patherns" : "[payapi-*]", // 以 payapi- 开头的索引均会应用此模板
      "order" : 99, // 模板的优先级
      "settings" : {
        "number_of_shards" : 1, // 索引使用的分片数量
        "number_of_replicas" : 0, // 索引的副本数,当你需要导入大量数据,第一次建立索引时,可以设置为0,提高写入速度,导入完成后可以动态修改
        "refresh_interval" : "15s" // 向临时空间刷写的频率,类似于硬盘的 fsync
      },
      "mapping" : {
        "dynamic" : "false", // 看下方解释
        "properties" : {
          "@timestamp" : {
            "type" : "text"
          },
          "logtime" : {
            "type" : "text", // 这里注意,不是所有时间都是 date
            "index" : "false" // true:字段可用于搜索, false: 不能用于搜索
          },
          "level" : {
            "type" : "text",
            "index" : "true"
          },
          "comp" : {
            "type" : "text",
            "index" : "true"
          },
          "blank" : {
            "type" : "text",
            "index" : "false"
          },
          "threadId" : {
            "type" : "text",
            "index" : "true"
          },
          "logbody" : {
            "type" : "text",
            "index" : "ture"
          }
        }
      }
    }
    复制代码
    dynamic 值
    说明
    true 在索引一个文档时,如果文档中有 field 不在 mapping 中,会自动映射类型,添加到 mapping,并索引该字段;
    false

    在索引一个文档时,如果文档中有 field 不在 mapping 中,不会添加到 mapping,也不会索引该字段,

    但是会保存下来,在 _source 可以看到该字段,但该字段不能被搜索;

    strict 在索引一个文档时,如果文档中有 field 不在 mapping 中,logstash 会直接抛出异常,拒绝索引;




    3)、查看模板,在 Kibana 的 Console 中执行

    GET /_template/template_payapi

    2、最后看下优化之后的效果,仅保留需要的字段

       

    Kibana -> Setting -> Index Patterns

     仅保留不能被删除的保留字段和自己需要的字段,同时 stroage 占用和写入速度也更加的 stable。

    参考链接:
    https://blog.csdn.net/shumoyin/article/details/84137178
    https://www.jianshu.com/p/dc73ec69c9f7

  • 相关阅读:
    使用RoboCopy 命令[转载]
    取得超级管理员权限
    重置网络命令win7
    ASP.NET Global.asax详解【转】
    逆波兰式算法
    设计模式【转自JackFrost的博客】
    VS2013 F12无法转到函数的定义处,总是从“元数据”获取的问题 ——解决方法
    扩展方法 C#
    委托Func和Action【转】
    添加路由
  • 原文地址:https://www.cnblogs.com/cheyunhua/p/14709109.html
Copyright © 2011-2022 走看看