zoukankan      html  css  js  c++  java
  • Logstash:如何处理 Logstash pipeline 错误信息

    转载自:https://elasticstack.blog.csdn.net/article/details/114290663

    在我们使用 Logstash 的时候经常会出现一些错误。比如当我们使用 dissect 这样的 filter 时,会出现格式不匹配从而导致错误。那么我们该如何处理这类错误呢?当 dissect 遇到错误的格式不能进行解析时,会为文档添加一个叫做 _dissectfailure 的标签,并继续处理该事件:
    
    那么我们该如何处理该类错误的信息呢?
    
    一种比较好的办法就是通过 elasticsearch output 把他存放于另外一个索引中。我们先用如下的例子来进行实验。
    
    dissect.conf
    
        input {
          generator {
            message => "<1>Oct 16 20:21:22 www1 1,2016/10/16 20:21:20,3,THREAT,SCAN,6,2016/10/16 20:21:20,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54"
            count => 1
          }
        }
         
        filter {
          if [message] =~ "THREAT," {
            dissect {
              mapping => {
                message => "<%{priority}>%{syslog_timestamp} %{+syslog_timestamp} %{+syslog_timestamp} %{logsource} %{pan_fut_use_01},%{pan_rec_time},%{pan_serial_number},%{pan_type},%{pan_subtype},%{pan_fut_use_02},%{pan_gen_time},%{pan_src_ip},%{pan_dst_ip},%{pan_nat_src_ip},%{pan_nat_dst_ip},%{pan_rule_name},%{pan_src_user},%{pan_dst_user},%{pan_app},%{pan_vsys},%{pan_src_zone},%{pan_dst_zone},%{pan_ingress_intf},%{pan_egress_intf},%{pan_log_fwd_profile},%{pan_fut_use_03},%{pan_session_id},%{pan_repeat_cnt},%{pan_src_port},%{pan_dst_port},%{pan_nat_src_port},%{pan_nat_dst_port},%{pan_flags},%{pan_prot},%{pan_action},%{pan_misc},%{pan_threat_id},%{pan_cat},%{pan_severity},%{pan_direction},%{pan_seq_number},%{pan_action_flags},%{pan_src_location},%{pan_dst_location},%{pan_content_type},%{pan_pcap_id},%{pan_filedigest},%{pan_cloud},%{pan_user_agent},%{pan_file_type},%{pan_xff},%{pan_referer},%{pan_sender},%{pan_subject},%{pan_recipient},%{pan_report_id},%{pan_anymore}"
              }
            }
          }
        }
    
    
    ​     
        output {
        	stdout { 
        		codec => rubydebug 
        	}
        }
    
    上面的 pipeline 在正常没有错误的情况下,会生成如下的结果:
    
    现在假如我们修改上面的 generator 部分。在它的前面添加一个空格:
    
        input {
          generator {
            message => " <1>Oct 16 20:21:22 www1 1,2016/10/16 20:21:20,3,THREAT,SCAN,6,2016/10/16 20:21:20,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54"
            count => 1
          }
        }
    
    由于 dissect 对格式是非常的挑剔。格式不对那么它就会生成一个错误。为此,它会为文档添加一个叫做 _dissectfailure 的标签。我们可以依据这个标签,把文档保存于一个叫做 parsefailures 的索引中:
    
        input {
          generator {
            message => " <1>Oct 16 20:21:22 www1 1,2016/10/16 20:21:20,3,THREAT,SCAN,6,2016/10/16 20:21:20,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54"
            count => 100
          }
        }
         
        filter {
          if [message] =~ "THREAT," {
            dissect {
              mapping => {
                message => "<%{priority}>%{syslog_timestamp} %{+syslog_timestamp} %{+syslog_timestamp} %{logsource} %{pan_fut_use_01},%{pan_rec_time},%{pan_serial_number},%{pan_type},%{pan_subtype},%{pan_fut_use_02},%{pan_gen_time},%{pan_src_ip},%{pan_dst_ip},%{pan_nat_src_ip},%{pan_nat_dst_ip},%{pan_rule_name},%{pan_src_user},%{pan_dst_user},%{pan_app},%{pan_vsys},%{pan_src_zone},%{pan_dst_zone},%{pan_ingress_intf},%{pan_egress_intf},%{pan_log_fwd_profile},%{pan_fut_use_03},%{pan_session_id},%{pan_repeat_cnt},%{pan_src_port},%{pan_dst_port},%{pan_nat_src_port},%{pan_nat_dst_port},%{pan_flags},%{pan_prot},%{pan_action},%{pan_misc},%{pan_threat_id},%{pan_cat},%{pan_severity},%{pan_direction},%{pan_seq_number},%{pan_action_flags},%{pan_src_location},%{pan_dst_location},%{pan_content_type},%{pan_pcap_id},%{pan_filedigest},%{pan_cloud},%{pan_user_agent},%{pan_file_type},%{pan_xff},%{pan_referer},%{pan_sender},%{pan_subject},%{pan_recipient},%{pan_report_id},%{pan_anymore}"
              }
            }
          }
        }
    
    
    ​     
        output {
        	stdout { 
        		codec => rubydebug 
        	}
         
          if "_dissectfailure" in [tags] {
            elasticsearch {
              index => "parsefailures"
              hosts => [ "localhost:9200" ]
            }
          }
        }
    
    在上面我有意识地把 generator 中的 count 增加到100。这样确保在 Logstash 退出之前,有时间把内容写到 Elasticsearch 中去。我们重新运行 Logstash:
    
    我们发现一个错误的信息。它说明在使用 dissect filter 时导致错误。我们可以在 Kibana 中检查 parsefailures 这个索引:
    
    GET parsefailures/_search
    
        {
          "took" : 1,
          "timed_out" : false,
          "_shards" : {
            "total" : 1,
            "successful" : 1,
            "skipped" : 0,
            "failed" : 0
          },
          "hits" : {
            "total" : {
              "value" : 102,
              "relation" : "eq"
            },
            "max_score" : 1.0,
            "hits" : [
              {
                "_index" : "parsefailures",
                "_type" : "_doc",
                "_id" : "3Llu8ncBReLdFyHVZsv0",
                "_score" : 1.0,
                "_source" : {
                  "@timestamp" : "2021-03-02T10:13:45.332Z",
                  "tags" : [
                    "_dissectfailure"
                  ],
                  "sequence" : 0,
                  "message" : " <1>Oct 16 20:21:22 www1 1,2016/10/16 20:21:20,3,THREAT,SCAN,6,2016/10/16 20:21:20,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54",
                  "host" : "liuxg",
                  "@version" : "1"
                }
              },
              {
                "_index" : "parsefailures",
                "_type" : "_doc",
                "_id" : "37l08ncBReLdFyHVUcs4",
                "_score" : 1.0,
                "_source" : {
                  "tags" : [
                    "_dissectfailure"
                  ],
                  "host" : "liuxg",
                  "@timestamp" : "2021-03-02T10:20:44.841Z",
                  "sequence" : 12,
                  "message" : " <1>Oct 16 20:21:22 www1 1,2016/10/16 20:21:20,3,THREAT,SCAN,6,2016/10/16 20:21:20,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54",
                  "@version" : "1"
                }
              },
          ...
    
    
  • 相关阅读:
    jQuery学习之------对标签属性的操作
    jQuery学习之------选择器
    PHP读取mysql中的数据
    sql server 数据库创建链接服务器访问另外一个sql server 数据库
    SQLServer使用链接服务器远程查询
    解决SQL Server 阻止了对组件 'Ad Hoc Distributed Queries' 的 STATEMENT'OpenRowset/OpenDatasource' 的访问的方法
    Delphi XE10百集视频教程计划
    Windows 版本的iTunes 修改iPhone的备份路径
    Centos7 下mysql 密码重置
    Windows server 2012文件服务器配置
  • 原文地址:https://www.cnblogs.com/sanduzxcvbnm/p/14570777.html
Copyright © 2011-2022 走看看