zoukankan      html  css  js  c++  java
  • dataView logstash flume比较

    比较

    flume文件一般用于将本地日志文件上传到kafka中

    flume的原理:https://www.cnblogs.com/zhangyinhua/p/7803486.html

    flume的使用:https://www.cnblogs.com/ciade/p/5495218.html

    logstash用于对接日志文件(nginx、windows、java,MySQL等等)、Redis和kafka等等,将数据存储到es中

    dataview支持本地文件到es,但优先推荐上面两种方式;支持数据库数据迁移到es中,但性能不是很好。不推荐使用

    flume官网:http://flume.apache.org/

    flume对接HTTPsink:https://blog.csdn.net/kkillala/article/details/82155845

    flume1.8支持的sink类型:https://www.cnblogs.com/swordfall/p/8157766.html

    logstash

    组成

    有input、filter和output三个部分组成,所以其脚本也是有这三个部分组成

    input {}
    filter {}
    output {}

    其中必须要有input和output两个,如果不指定,就默认使用的是stdin和stdout
    最简单的配置语句,输入输出到控制台
    input {stdin{}}
    output {stdout{}}

    常用命令

    查看插件列表

    logstash-plugin list

    启动logstash

    .logstash -f logstash-csv-stdout.conf --config.reload.automatic
    .logstash -f logstash-csv-stdout.conf --config.reload.automatic --config.reload.interval 3

    .logstash -f D:logstashconfiglogstash-txt-json-es.conf --config.reload.automatic --path.data=.data
    .logstash -f D:logstashconfiglogstash-kafka-json-es.conf --config.reload.automatic --path.data=.data

    默认加载logstash脚本统计目录的conf文件,也可以指定conf文件的完整路径

    --config.reload.automatic:conf文件修改时可以自动重新加载,类似nginx的reload,都是先起个新的进程或线程,没有问题时再把旧的关掉

    --config.reload.interval :重新加载conf文件检测的时间间隔,默认是3s

    --path.data:进程文件锁,logstash默认只能单实例启动,因为使用的同一个目录的文件锁,如果给每个实例都指定独立的文件路径,那么就可以多实例运行。

    自动加载配置的处理逻辑:

    https://blog.csdn.net/qq_32292967/article/details/78622647

    事件与消息

    数据在线程之间以事件的形式流传。不要叫行,因为 logstash 可以处理多行事件。
    事件就是一个 Ruby 对象,或者更简单的理解为就是一个哈希也行。可以随意给事件添加字段或者从事件里删除字段。
    每个 logstash 过滤插件,都会有四个方法叫 add_tag , remove_tag ,add_field 和 remove_field 。它们在插件过滤匹配成功时生效。

    事件中的默认字段

    1. host 标记事件发生在哪里。
    2. type 标记事件的唯一类型。
    3. tags 标记事件的某方面属性。这是一个数组,一个事件可以有多个标签。
    4. @timestamp,用来标记事件的发生时间。因为这个字段涉及到 Logstash的内部流转,所以必须是一个joda对象,如果你尝试自己给一个字符串字段重命名为 @timestamp 的话,Logstash会直接报错。所以,请使用 filters/date 插件 来管理这个特殊字段。使用这个插件可以将其他事件字段的值替换掉@timestamp的默认值,因为这个值取的是logstash的服务端时间,有时并不满足需求,需要替换成真实日志的时间,并且这个字段用的是UTC的时间,如果是东八区,那么时间就会差8个小时,这个时候如果用它来动态创建日期索引,就会出现问题,所以这个字段的值一般都是要替换掉的,当然也不要通过ruby代码将UTC的时间认为的加8个小时,这是不对的。

    UTC时间介绍:

    https://www.cnblogs.com/doit8791/p/10398997.html

    加8小时的错误方法和解决方法

    https://blog.csdn.net/u011431128/article/details/79545240?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-3.channel_param&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-3.channel_param

    东八区是能够根据正确时间字段动态创建索引的方法时,使用日志里面时间字段的时间,新建一个时间字段,该字段就用来存储index的动态日期名字,比如说:

    filter {
      date {
        match => ["LogDateTime","yyyy-MM-dd HH:mm:ss.SSS"]    #message在实际应用中修改为自己的字段
        target => "index_day"
      }
    
      ruby {
        code => '
            #这个不行,还是0时区的
    #        event.set("@timestamp",event.get("@timestamp").getlocal)
            #下面的方案虽然暂时可以得到形式上的东8区的时间,但其实是0时区的,真实时间多了8小时,后面ES或者kibana使用该字段时都会有问题,但是如果不使用也没有问题,这样可以解决创建索引的日期对不上的问题
    #        event.set("timestamp", event.get("@timestamp").time.localtime + 8*60*60)
    #        event.set("@timestamp",event.get("timestamp"))
            event.set("index_day", event.get("index_day").time.localtime.strftime("%Y%m%d"))
        '
    #    remove_field => ["timestamp"]
      }
      mutate {
        remove_field => ["@version", "message", "path", "host", "geoip"]
      }
    }
    
    output {
      elasticsearch {
         index => "indexpreifx-%{index_day}" #使用index_day字段里面的时间
         index => "indexpreifx-%{+YYYY.MM.dd}" #使用@timastamp里面的时间
      }
      stdout {
        codec => json_lines
      }

    5. @version

    事件的作用

    合并多行数据(Multiline)
    有些时候,应用程序调试日志会包含非常丰富的内容,为一个事件打印出很多行内
    容。这种日志通常都很难通过命令行解析的方式做分析。

    编码插件codec

    这个插件不仅可以在filter里使用,在input和output里也都可以使用

    codec是coder和decoder的缩写,logstash的整个处理过程就变成了input、decoder、filter、encoder和output这几个过程。
    codec就是用来处理decoder和encode过程的。
    codec的引入是的输入的日志格式不只是文本的格式,可以通过codec指定任意的日志格式,举例如下

    将消息自动转化为json格式

    codec => json
    codec => json_lines

    指定编码格式

    codec => json {charset=>"GBK"}

    为一个事件输出多行日志,常用于堆栈信息的整理

    codec => multiline {
        pattern => "^["
        negate => true
        what => "previous"
    }

    字段引用

    字段是 Logstash::Event 对象的属性。我们之前提过事件就像一个哈希一样,如果你想在 Logstash 配置中使用字段的值,只需要把字段的名字写在中括号 []里就行了

    Logstash 还支持变量内插,在字符串里使用字段引用的方法是这样:
    "the longitude is %{[geoip][location][0]}"
    如果是单层变量,不用[],直接使用%{变量名}也是可以的

    输入类型

    基本上碰到的接口协议都支持,包括tcp、http、jmx等等
    控制台输入

      stdin {
        codec => json {charset=>"GBK"}
      }

    本地文件

      file {
        path => ["D:logstashdatajson.txt"]  
        start_position => "beginning"
        sincedb_path => "D:logstashdatasincedb_path	xt-json.txt"
        codec => "json"
      }

    log4j日志框架的日志

    TCP数据流

    kafka数据流

      kafka {
        bootstrap_servers => "xx:9093,xx:9093"
        client_id => "xxx"
        group_id => "xxx"
        auto_offset_reset => "latest"
        consumer_threads => 1
        decorate_events => true
        topics => ["xxxx"]
        type => "bhy"
        codec => json  
        security_protocol => "SASL_PLAINTEXT"
        sasl_mechanism => "PLAIN"
        sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='xxx' password='xxxx';"
      }

    filter插件

    条件判断

    通过条件判断可以处理满足指定条件的消息,不满足条件的就会被提前丢弃

    if [Key] != 10007 {
      drop{}
    }

    日期处理

      date {
        match => ["LogDateTime","yyyy-MM-dd HH:mm:ss.SSS"]
        target => "index_day"
      }

    date 插件可以用来转换你的日志记录中的时间字符串,变成 LogStash::Timestamp 对象,然后转存到 @timestamp 字段里。

    注意:因为在稍后的outputs/elasticsearch中常用的%{+YYYY.MM.dd}这种写法必须读取@timestamp数据,所以一定不要直接删掉这个字段保留自己的字段,而是应该用 filters/date 转换后删除自己的字段!因为使用UTC的时区,所以使用这个字段在东8区创建索引时有问题,因此应该用上面@timestamp处的方案介绍。

    grok正则捕获

    性能略差,谨慎使用

    grok{
        match=>{
            "message"=>"s+(?<request_time>d+(?:.d+)?)s+"
        }
    }

    从中可以解析出新的request_time字段
    表达式语法,默认预定义一些正则表达式变量,也可以自定义表达式变量,放到指定路径的文件,可以直接使用这些变量解析出新的字段,并且也可以指定新的字段的数据类型,就可以避免在使用mutate的convert再对字段类型进行转换了。
    grok 表达式的打印赋值格式的完整语法是下面这样的:

    %{PATTERN_NAME:capture_name:data_type}

    :data_type 目前只支持两个值: int 和 float 。
    所以我们可以改进我们的配置成下面这样:

    filter{
        grok{
            match=>{
                "message"=>"%{WORD} %{NUMBER:request_time:float} %{WORD}"
            }
        }
    }

    使用存储到配置文件的表达式格式如下:

    filter{
        grok{
            patterns_dir=>["/path/to/your/own/patterns"]match=>{
                "message"=>"%{SYSLOGBASE} %{DATA:message}"
            }overwrite=>["message"]
        }
    }

    匹配多个正则表达式
    因为日志格式可能不止一种,需要灵活配置多个,其实可以设置成列表的格式,如下:

    grok{
        match=>{
            "message"=>["%{WORD} %{NUMBER:request_time:float} %{WORD}",
            "%{NUMBER:request_time:int} %{WORD}"]
        }
    }

    dissect

    替代grok的性能较好,使用较简单的插件

    GeoIP

    地址信息查询插件,查询IP对应的省市和经纬度,对于地图的可视化展示。当然只能查询到公网的IP,内部网的IP就不支持了。

    mutate

    插件是 Logstash 另一个重要插件。它提供了丰富的基础类型数据处理能力。包括类型转换,字符串处理和字段处理等。

    convert:类型转换

      mutate {
        convert => {
          "id" => "integer"
          "age" => "integer"
          "name" => "string"
        }
      }

    支持转换的类型包括:interger,float,boolean和string。注意不支持long和date类型。date转换是需要使用filter里的date插件进行操作。如果使用了不支持的数据类型,就会出现奇怪的错误如下:

    LogStash::ConfigurationError: translation missing: en.logstash.agent.configuration.invalid_plugin_register。

    字符串处理

    常用的操作基本都有。

    正则替换

    灵活运用,可以实现增删改查、截取的各种效果

      mutate {
           gsub => ["urlparams", "[\?#]", "_"]
           gsub => ["urlparams", "A", "a"]
      }    
    split

    根据指定的分隔符,将字段从字符串类型转成数组类型

    join

    split的反向操作

    merge

    合并两个数组或者哈希字段

    strip

    去除左右两端的空格

    lowercase、uppercase

    字段处理

    rename

    重命名某个字段

    需要注意的是mutate里面各个操作是有顺序,并不是按照定义的顺序执行的

    json

    将指定的输入内容为JSON的字段解析为多个json结构中的字段
    如果输入字段的内容是一个字符串,那么可以直接将这个json字符串直接拆分成几个字段

    json {
      source => "Value_json"
      remove_field => ["Value_json"]
    }

    但是如果内容是一个json对象,那么就不能直接拆分了,做法是先将json对象的字符串塞进另一个新建的字段中,然后再用上面的方法拆分这个新字段,拆分之后就将这个新字段删除掉,也可以将原来的json对象字段也删除掉,这样留下的就是拆分后的各字段了。下面中的Value字段是一个JSON对象,该JSON对象内又嵌套的有MenuInfo和LogData字段。

      mutate {
        add_field => { "Value_json" => "%{[Value]}" }
        add_field => { "MenuInfo_json" => "%{[Value][MenuInfo]}" }
        add_field => { "LogData_json" => "%{[Value][LogData]}" }
        remove_field => ["Value"]
      }
      json {
        source => "Value_json"
        remove_field => ["Value_json"]
        remove_field => ["MenuInfo"]
        remove_field => ["LogData"]
      }
      json {
        source => "MenuInfo_json"
        remove_field => ["MenuInfo_json"]
      }
      json {
        source => "LogData_json"
        remove_field => ["LogData_json"]
      }

    metrics

    内存里实现实时的计数和采样分析

    ruby

    通过编写ruby代码实现灵活的控制

    输出类型

    elasticsearch

    output {
        elasticsearch {
            hosts => ["192.168.0.2:9280","192.168.0.3:9280"]
            user => 'xxx'
            password => 'xxxxxxx'
            index => "logstash-%{type}-%{+YYYY.MM.dd}"
            document_type => "%{type}"
            flush_size => 20000         #批量写入ES的最大条数
            idle_flush_time => 10       #批量写入ES的最大延时,所以每次批量写入ES的条数由上面这两个参数确定的,那个先满足都会触发
            sniffing => true            #当指定节点不可用时,自动寻找其他节点
            template_overwrite => true  # 是否重写模板
        }
    }

    index => "logstash-%{type}-%{+YYYY.MM.dd}"
    可以通过变量来灵活指定index的名字,这样可以匹配预先定义好的模板,自动创建新的index。这里面的时间就是取自于之前的@timestamp字段,所以如果在这里想要使用日志格式化,就不能随意的删除时间字段

    批量写入ES的参数控制

    低版本的是通过flush_size和idle_flush_time参数控制的,5.0之后还会受patch_size这个参数的控制,因为filter-output线程跟input线程分开了,这个参数是filter-output线程批量接收input队列消息数的数目,相当于每个filter-output线程控制批量写入ES,跟ES有一个连接池,而不是只有一个连接。


    template:logstash-*
    默认是一个上面的名字的模板,符合上面的命名的话都会应用这个模板。不起这样的名字就不会用到这个模板。

    {
      "order": 0,
      "version": 60001,
      "index_patterns": [
        "logstash-*"
      ],
      "settings": {
        "index": {
          "refresh_interval": "5s"
        }
      },
      "mappings": {
        "_default_": {
          "dynamic_templates": [
            {
              "message_field": {
                "path_match": "message",
                "match_mapping_type": "string",
                "mapping": {
                  "type": "text",
                  "norms": false
                }
              }
            },
            {
              "string_fields": {
                "match": "*",
                "match_mapping_type": "string",
                "mapping": {
                  "type": "text",
                  "norms": false,
                  "fields": {
                    "keyword": {
                      "type": "keyword",
                      "ignore_above": 256
                    }
                  }
                }
              }
            }
          ],
          "properties": {
            "@timestamp": {
              "type": "date"
            },
            "@version": {
              "type": "keyword"
            },
            "geoip": {
              "dynamic": true,
              "properties": {
                "ip": {
                  "type": "ip"
                },
                "location": {
                  "type": "geo_point"
                },
                "latitude": {
                  "type": "half_float"
                },
                "longitude": {
                  "type": "half_float"
                }
              }
            }
          }
        }
      },
      "aliases": {}
    }

    https://www.cnblogs.com/yb38156/p/13054004.html

    input,queue,pipline运行的线程模型

     

    简述:

    (1)每个Input启动一个线程,从对应数据源获取数据

    (2)Input会将数据写入一个队列:默认为内存中的有界队列(意外停止会导致数据丢失)。为了防止数丢失Logstash提供了两个特性:

    Persistent Queues:通过磁盘上的queue来防止数据丢失。默认使用的是memory,需要手动调整为persisted

    Dead Letter Queues:保存无法处理的event(仅支持Elasticsearch作为输出源) 。默认不开启,需要手动开启

    上面的配置都是在logstash.yml文件中。

    (3)Logstash会有多个pipeline worker, 每一个pipeline worker会从队列中取一批数据,然后执行filter和output(worker数目及每次处理的数据量由配置确定)

    Persistent Queue说明

    https://www.elastic.co/guide/en/logstash/current/persistent-queues.html

    提供at-least-once保证消息不丢失,但不保证不重复

    局限性:tcp,udp,ZeroMQ,以及没有ack机制的input类型,无法保证数据不丢失;持久化硬盘损坏,故障情况无法处理

  • 相关阅读:
    Spring 由缓存切点驱动的通知者
    Spring 缓存切面
    Spring 缓存注解 SpEL 表达式解析
    Spring 缓存注解解析过程
    SpEL 实例
    占位符解析
    RequestMapping 注解的解析、匹配、注册
    构建与众不同的软件
    [转载]Windows服务编写原理及探讨(4)
    [转载]Windows服务编写原理及探讨(3)
  • 原文地址:https://www.cnblogs.com/suntp/p/10320355.html
Copyright © 2011-2022 走看看