zoukankan      html  css  js  c++  java
  • ElasticSearch实战系列七: Logstash实战使用-图文讲解

    前言

    在上一篇中我们介绍了Logstash快速入门,本文主要介绍的是ELK日志系统中的Logstash的实战使用。实战使用我打算从以下的几个场景来进行讲解。

    时区问题解决方案

    在我们使用logstash将采集的数据传输到ES中的时候,会发现采集的时间@timestamp的时间和我们本地的不一致,这个主要是因为时区的问题导致的,我们在计算时间的时候需要将这个时间增加8小时,但是这样会很不方便。为了永久解决这个问题,我们可以在logstash中的filter中对该字段进行转换,增加8小时。

    添加的配置如下:

    
     ruby {
       code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
     }
     ruby {
       code => "event.set('@timestamp',event.get('timestamp'))"
     }
     mutate {
       remove_field => ["timestamp"]
     }
     
    

    原本示例图:
    在这里插入图片描述

    添加配置之后的示例图:
    在这里插入图片描述
    可以看到添加配置之后@timestamp时间已经和本地时间基本一致了。

    日志内容切分

    我们在进行采集日志到ES中的时候,有时需要对日志内容进行切割。比如得到日志内容的时间以及日志级别等等。这时我们就可以通过grok来对日志内容进行切分,比如将制定好的日志内容切割为日志时间、线程名称、日志级别、类名以及详细内容等等。我们只需要在logstash的filter中使用grok语法即可完成日志内容切割。
    这里我们使用JAVA的Logback来制定日志输出格式,然后通过日志的格式编写grok语法,最后将grok配置添加到logstash的filter中。

    Logback输出配置:

    |%d{yyyy-MM-dd HH:mm:ss.SSS}|[%thread]|%-5level|%logger{50}|-%msg%n

    日志样例数据:

    |2020-07-24 17:08:33.159|[Thread-5]|INFO|com.pancm.Application|-测试示例三: All things in their being are good for something. 天生我才必有用3

    grok模式:

    |%{DATA:log_time}|%{DATA:thread}|%{DATA:log_level}|%{DATA:class_name}|-%{GREEDYDATA:content}

    使用grok分析
    在这里插入图片描述
    可以看到以及分析匹配成功了。

    然后我们在filter中添加如下配置:

    grok {
    match => { "message" =>"|%{DATA:log_time}|%{DATA:thread}|%{DATA:log_level}|%{DATA:class_name}|-%{GREEDYDATA:content}"
    }
    }

    最终输出的日志到ES的示例图:

    在这里插入图片描述

    自定义模板

    我们在使用Logstash采集日志的时候,如果没有指定索引库或模板,则会使用ElasticSearch默认自带的名字为”logstash”的模板,默认应用于Logstash写入数据到ElasticSearch使用。但是我们希望使用自定义的索引模板,将采集的日志按照我们自身的想法来写入,此时我们就需要用到自定义模板了。
    主要有两种方式,一种是在logstash的output插件中使用template指定本机器上的一个模板json路径, 例如 template => "/home/logstash.json",json里面的内容为我们自定的索引mapping,虽然这种方式简单,但是分散在Logstash机器上,维护起来比较麻烦。还有一种是在elasticsearc服务端自定义配置模板,事先将模板设置好,然后在logstash的output输出中指定该模板即可,这种方式比较灵活方便,可动态更改,全局生效。
    这里我们还是通过一个示例来进行说明,我们首先创建一个template_mylog的模板,配置这几个字段:
    log_time、thread、log_level、class_name、content。

    语句如下:

    
    PUT _template/template_mylog
    {       
            "index_patterns" : [
                "mylog-*"
            ],
            "order" : 10,
              "settings": {  
                  "index.number_of_shards": 3,  
                  "number_of_replicas": 1
              },  
          "mappings" : {  
              "properties" : {  
                   "log_level" : { "type" : "keyword" },
                "thread" : { "type" : "keyword" },
                 "class_name" : { "type" : "keyword" },
                  "content" : { "type" : "keyword" },
                 "log_time" : {   "type" : "date","format" : "yyyy-MM-dd HH:mm:ss.SSS"}
              }  
             
          }  
      }
    
    

    示例图:
    在这里插入图片描述
    注:上述的配置比其他mapping而言多了两个新配置,一个是index_patterns,该配置表明自动创建的索引开头以mylog-的索引库都会采用该模板;而order表示顺序级别,在有相同的索引模板中,该值越大,优先级越高。

    创建成功之后,我们只需在output中的添加如下配置即可。

     
     elasticsearch {
                hosts => ["127.0.0.1:9200"]
                index => "mylog-%{+YYYY.MM.dd}"  
        }
        
    

    然后我们启动logstash进行日志的采集。
    效果图:

    在这里插入图片描述

    写入多个索引库

    我们在使用logstash采集日志的时候,有时有多种不同的日志并且需要采集到不同的索引库中,这时我们就可以通过标记来进行写入。比如采集/home/logs目录下的日志我定义一个标记为java,采集/home/logs2目录下的日志我定义一个标记为java2,那么在写入ElasticSearch的时候只需要根据该标记区分写入即可。

    logstash input配置示例:

    
      file {
            path => ["/home/logs/mylog-2020-08-13.0.txt"]
            type => "java"
            start_position => "beginning"
            sincedb_path => "/dev/null"
        }
        file {
            path => ["/home/logs2/*.txt"]
            type => "java2"
            start_position => "beginning"
            sincedb_path => "/dev/null"
        }
    

    logstash output配置示例:

    
        if [type] == "java"{
          elasticsearch {
             hosts => ["127.0.0.1:9200"]
             index => "mylog-%{+YYYY.MM.dd}"
          }
        }
    
        if [type] == "java2"{
          elasticsearch {
             hosts => ["127.0.0.1:9200"]
             index => "mylog-%{+YYYY.MM}"
          }
        }
        
    

    示例图在多行内容合并场景中。

    多行内容合并

    我们在采集日志的时候,经常会遇到异常日志,并且异常日志并非为一行内容,如果我们按照原有的方式采集,在ElasticSearch中显示的是一行一行的内容,这样的话我们排查问题会很头疼。幸好Logstash中支持多行日志合并,使用multiline.pattern、multiline.negate和multiline.what来实现配置实现。
    下面的配置中,我们通过制定匹配规则将以空格开头的所有行合并到上一行,并把以Caused by开头的也追加到上一行。
    在Logstash的input配置中添加如下配置:

    
     codec => multiline {
              pattern => "s*["
              negate => "true"
              what => "previous"
            }
            
    

    异常日志:
    在这里插入图片描述
    原异常日志在ElasticSearch中示例图:
    在这里插入图片描述

    多行合并之后的效果图:
    在这里插入图片描述

    完整配置

    logstash-test.conf 配置

    input{
        file {
            path => ["/home/logs/mylog-2020-08-13.0.txt"]
            type => "java"
            start_position => "beginning"
            sincedb_path => "/dev/null"
        }
        file {
            path => ["/home/logs2/*.txt"]
            type => "java2"
            codec => multiline {
              pattern => "s*["
              negate => "true"
              what => "previous"
            }
            start_position => "beginning"
            sincedb_path => "/dev/null"
        }
    }
    
    filter {
    
       grok {
             match => { "message" =>"|%{DATA:log_time}|%{DATA:thread}|%{DATA:log_level}|%{DATA:class_name}|-%{GREEDYDATA:content}" }       
        }
    
      ruby {
       code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
     }
     ruby {
       code => "event.set('@timestamp',event.get('timestamp'))"
     }
     mutate {
       remove_field => ["timestamp"]
     }
    }
    
    
    
    
    output {
      stdout {
        codec => rubydebug
      }
        if [type] == "java"{
          elasticsearch {
             hosts => ["127.0.0.1:9200"]
             index => "mylog-%{+YYYY.MM.dd}"
          }
        }
    
        if [type] == "java2"{
          elasticsearch {
             hosts => ["127.0.0.1:9200"]
             index => "mylog-%{+YYYY.MM}"
          }
        }
      
    }
    

    异常问题解决方案

    1.logstash: Could not execute action: PipelineAction::Create

    , action_result: false

    解决办法: 斜杆采用“/”

    2, logstash: object mapping for [host] tried to parse field [host] as object, but found a concrete value

    解决办法: 在filter里面添加如下配置:

     mutate {
          rename => { "host" => "host.name" }
        }
    

    其它

    ElasticSearch实战系列:

    音乐推荐

    原创不易,如果感觉不错,希望给个推荐!您的支持是我写作的最大动力!
    版权声明:
    作者:虚无境
    博客园出处:http://www.cnblogs.com/xuwujing
    CSDN出处:http://blog.csdn.net/qazwsxpcm    
    个人博客出处:http://www.panchengming.com

  • 相关阅读:
    C#中静态与非静态方法比较
    Hibernate学习之路-- -映射 继承关系(subclass , joined-subclass,union-subclass )
    网络协议概述:物理层、连接层、网络层、传输层、应用层详解
    phpstorm xdebug配置
    eclipse修改内存大小
    Session机制详解
    java把html标签字符转普通字符(反转换成html标签)(摘抄)
    JAVA调用WCF
    RabbitMQ入门与使用篇
    大话程序猿眼里的高并发
  • 原文地址:https://www.cnblogs.com/xuwujing/p/13520666.html
Copyright © 2011-2022 走看看