比较
flume文件一般用于将本地日志文件上传到kafka中
flume的原理:https://www.cnblogs.com/zhangyinhua/p/7803486.html
flume的使用:https://www.cnblogs.com/ciade/p/5495218.html
logstash用于对接日志文件(nginx、windows、java,MySQL等等)、Redis和kafka等等,将数据存储到es中
dataview支持本地文件到es,但优先推荐上面两种方式;支持数据库数据迁移到es中,但性能不是很好。不推荐使用
flume官网:http://flume.apache.org/
flume对接HTTPsink:https://blog.csdn.net/kkillala/article/details/82155845
flume1.8支持的sink类型:https://www.cnblogs.com/swordfall/p/8157766.html
logstash
组成
有input、filter和output三个部分组成,所以其脚本也是有这三个部分组成
input {}
filter {}
output {}
其中必须要有input和output两个,如果不指定,就默认使用的是stdin和stdout
最简单的配置语句,输入输出到控制台
input {stdin{}}
output {stdout{}}
常用命令
查看插件列表
logstash-plugin list
启动logstash
.logstash -f logstash-csv-stdout.conf --config.reload.automatic
.logstash -f logstash-csv-stdout.conf --config.reload.automatic --config.reload.interval 3
.logstash -f D:logstashconfiglogstash-txt-json-es.conf --config.reload.automatic --path.data=.data
.logstash -f D:logstashconfiglogstash-kafka-json-es.conf --config.reload.automatic --path.data=.data
默认加载logstash脚本统计目录的conf文件,也可以指定conf文件的完整路径
--config.reload.automatic:conf文件修改时可以自动重新加载,类似nginx的reload,都是先起个新的进程或线程,没有问题时再把旧的关掉
--config.reload.interval :重新加载conf文件检测的时间间隔,默认是3s
--path.data:进程文件锁,logstash默认只能单实例启动,因为使用的同一个目录的文件锁,如果给每个实例都指定独立的文件路径,那么就可以多实例运行。
自动加载配置的处理逻辑:
https://blog.csdn.net/qq_32292967/article/details/78622647
事件与消息
数据在线程之间以事件的形式流传。不要叫行,因为 logstash 可以处理多行事件。
事件就是一个 Ruby 对象,或者更简单的理解为就是一个哈希也行。可以随意给事件添加字段或者从事件里删除字段。
每个 logstash 过滤插件,都会有四个方法叫 add_tag , remove_tag ,add_field 和 remove_field 。它们在插件过滤匹配成功时生效。
事件中的默认字段
1. host 标记事件发生在哪里。
2. type 标记事件的唯一类型。
3. tags 标记事件的某方面属性。这是一个数组,一个事件可以有多个标签。
4. @timestamp,用来标记事件的发生时间。因为这个字段涉及到 Logstash的内部流转,所以必须是一个joda对象,如果你尝试自己给一个字符串字段重命名为 @timestamp 的话,Logstash会直接报错。所以,请使用 filters/date 插件 来管理这个特殊字段。使用这个插件可以将其他事件字段的值替换掉@timestamp的默认值,因为这个值取的是logstash的服务端时间,有时并不满足需求,需要替换成真实日志的时间,并且这个字段用的是UTC的时间,如果是东八区,那么时间就会差8个小时,这个时候如果用它来动态创建日期索引,就会出现问题,所以这个字段的值一般都是要替换掉的,当然也不要通过ruby代码将UTC的时间认为的加8个小时,这是不对的。
UTC时间介绍:
https://www.cnblogs.com/doit8791/p/10398997.html
加8小时的错误方法和解决方法
东八区是能够根据正确时间字段动态创建索引的方法时,使用日志里面时间字段的时间,新建一个时间字段,该字段就用来存储index的动态日期名字,比如说:
filter { date { match => ["LogDateTime","yyyy-MM-dd HH:mm:ss.SSS"] #message在实际应用中修改为自己的字段 target => "index_day" } ruby { code => ' #这个不行,还是0时区的 # event.set("@timestamp",event.get("@timestamp").getlocal) #下面的方案虽然暂时可以得到形式上的东8区的时间,但其实是0时区的,真实时间多了8小时,后面ES或者kibana使用该字段时都会有问题,但是如果不使用也没有问题,这样可以解决创建索引的日期对不上的问题 # event.set("timestamp", event.get("@timestamp").time.localtime + 8*60*60) # event.set("@timestamp",event.get("timestamp")) event.set("index_day", event.get("index_day").time.localtime.strftime("%Y%m%d")) ' # remove_field => ["timestamp"] } mutate { remove_field => ["@version", "message", "path", "host", "geoip"] } } output { elasticsearch { index => "indexpreifx-%{index_day}" #使用index_day字段里面的时间 index => "indexpreifx-%{+YYYY.MM.dd}" #使用@timastamp里面的时间 } stdout { codec => json_lines }
5. @version
事件的作用
合并多行数据(Multiline)
有些时候,应用程序调试日志会包含非常丰富的内容,为一个事件打印出很多行内
容。这种日志通常都很难通过命令行解析的方式做分析。
编码插件codec
这个插件不仅可以在filter里使用,在input和output里也都可以使用
codec是coder和decoder的缩写,logstash的整个处理过程就变成了input、decoder、filter、encoder和output这几个过程。
codec就是用来处理decoder和encode过程的。
codec的引入是的输入的日志格式不只是文本的格式,可以通过codec指定任意的日志格式,举例如下
将消息自动转化为json格式
codec => json
codec => json_lines
指定编码格式
codec => json {charset=>"GBK"}
为一个事件输出多行日志,常用于堆栈信息的整理
codec => multiline { pattern => "^[" negate => true what => "previous" }
字段引用
字段是 Logstash::Event 对象的属性。我们之前提过事件就像一个哈希一样,如果你想在 Logstash 配置中使用字段的值,只需要把字段的名字写在中括号 []里就行了
Logstash 还支持变量内插,在字符串里使用字段引用的方法是这样:
"the longitude is %{[geoip][location][0]}"
如果是单层变量,不用[],直接使用%{变量名}也是可以的
输入类型
基本上碰到的接口协议都支持,包括tcp、http、jmx等等
控制台输入
stdin { codec => json {charset=>"GBK"} }
本地文件
file { path => ["D:logstashdatajson.txt"] start_position => "beginning" sincedb_path => "D:logstashdatasincedb_path xt-json.txt" codec => "json" }
log4j日志框架的日志
TCP数据流
kafka数据流
kafka { bootstrap_servers => "xx:9093,xx:9093" client_id => "xxx" group_id => "xxx" auto_offset_reset => "latest" consumer_threads => 1 decorate_events => true topics => ["xxxx"] type => "bhy" codec => json security_protocol => "SASL_PLAINTEXT" sasl_mechanism => "PLAIN" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='xxx' password='xxxx';" }
filter插件
条件判断
通过条件判断可以处理满足指定条件的消息,不满足条件的就会被提前丢弃
if [Key] != 10007 { drop{} }
日期处理
date { match => ["LogDateTime","yyyy-MM-dd HH:mm:ss.SSS"] target => "index_day" }
date 插件可以用来转换你的日志记录中的时间字符串,变成 LogStash::Timestamp 对象,然后转存到 @timestamp 字段里。
注意:因为在稍后的outputs/elasticsearch中常用的%{+YYYY.MM.dd}这种写法必须读取@timestamp数据,所以一定不要直接删掉这个字段保留自己的字段,而是应该用 filters/date 转换后删除自己的字段!因为使用UTC的时区,所以使用这个字段在东8区创建索引时有问题,因此应该用上面@timestamp处的方案介绍。
grok正则捕获
性能略差,谨慎使用
grok{ match=>{ "message"=>"s+(?<request_time>d+(?:.d+)?)s+" } }
从中可以解析出新的request_time字段
表达式语法,默认预定义一些正则表达式变量,也可以自定义表达式变量,放到指定路径的文件,可以直接使用这些变量解析出新的字段,并且也可以指定新的字段的数据类型,就可以避免在使用mutate的convert再对字段类型进行转换了。
grok 表达式的打印赋值格式的完整语法是下面这样的:
%{PATTERN_NAME:capture_name:data_type}
:data_type 目前只支持两个值: int 和 float 。
所以我们可以改进我们的配置成下面这样:
filter{ grok{ match=>{ "message"=>"%{WORD} %{NUMBER:request_time:float} %{WORD}" } } }
使用存储到配置文件的表达式格式如下:
filter{ grok{ patterns_dir=>["/path/to/your/own/patterns"]match=>{ "message"=>"%{SYSLOGBASE} %{DATA:message}" }overwrite=>["message"] } }
匹配多个正则表达式
因为日志格式可能不止一种,需要灵活配置多个,其实可以设置成列表的格式,如下:
grok{ match=>{ "message"=>["%{WORD} %{NUMBER:request_time:float} %{WORD}", "%{NUMBER:request_time:int} %{WORD}"] } }
dissect
替代grok的性能较好,使用较简单的插件
GeoIP
地址信息查询插件,查询IP对应的省市和经纬度,对于地图的可视化展示。当然只能查询到公网的IP,内部网的IP就不支持了。
mutate
插件是 Logstash 另一个重要插件。它提供了丰富的基础类型数据处理能力。包括类型转换,字符串处理和字段处理等。
convert:类型转换
mutate { convert => { "id" => "integer" "age" => "integer" "name" => "string" } }
支持转换的类型包括:interger,float,boolean和string。注意不支持long和date类型。date转换是需要使用filter里的date插件进行操作。如果使用了不支持的数据类型,就会出现奇怪的错误如下:
LogStash::ConfigurationError: translation missing: en.logstash.agent.configuration.invalid_plugin_register。
字符串处理
常用的操作基本都有。
正则替换
灵活运用,可以实现增删改查、截取的各种效果
mutate { gsub => ["urlparams", "[\?#]", "_"] gsub => ["urlparams", "A", "a"] }
split
根据指定的分隔符,将字段从字符串类型转成数组类型
join
split的反向操作
merge
合并两个数组或者哈希字段
strip
去除左右两端的空格
lowercase、uppercase
字段处理
rename
重命名某个字段
需要注意的是mutate里面各个操作是有顺序,并不是按照定义的顺序执行的
json
将指定的输入内容为JSON的字段解析为多个json结构中的字段
如果输入字段的内容是一个字符串,那么可以直接将这个json字符串直接拆分成几个字段
json { source => "Value_json" remove_field => ["Value_json"] }
但是如果内容是一个json对象,那么就不能直接拆分了,做法是先将json对象的字符串塞进另一个新建的字段中,然后再用上面的方法拆分这个新字段,拆分之后就将这个新字段删除掉,也可以将原来的json对象字段也删除掉,这样留下的就是拆分后的各字段了。下面中的Value字段是一个JSON对象,该JSON对象内又嵌套的有MenuInfo和LogData字段。
mutate { add_field => { "Value_json" => "%{[Value]}" } add_field => { "MenuInfo_json" => "%{[Value][MenuInfo]}" } add_field => { "LogData_json" => "%{[Value][LogData]}" } remove_field => ["Value"] } json { source => "Value_json" remove_field => ["Value_json"] remove_field => ["MenuInfo"] remove_field => ["LogData"] } json { source => "MenuInfo_json" remove_field => ["MenuInfo_json"] } json { source => "LogData_json" remove_field => ["LogData_json"] }
metrics
内存里实现实时的计数和采样分析
ruby
通过编写ruby代码实现灵活的控制
输出类型
elasticsearch
output { elasticsearch { hosts => ["192.168.0.2:9280","192.168.0.3:9280"] user => 'xxx' password => 'xxxxxxx' index => "logstash-%{type}-%{+YYYY.MM.dd}" document_type => "%{type}" flush_size => 20000 #批量写入ES的最大条数 idle_flush_time => 10 #批量写入ES的最大延时,所以每次批量写入ES的条数由上面这两个参数确定的,那个先满足都会触发 sniffing => true #当指定节点不可用时,自动寻找其他节点 template_overwrite => true # 是否重写模板 } }
index => "logstash-%{type}-%{+YYYY.MM.dd}"
可以通过变量来灵活指定index的名字,这样可以匹配预先定义好的模板,自动创建新的index。这里面的时间就是取自于之前的@timestamp字段,所以如果在这里想要使用日志格式化,就不能随意的删除时间字段
批量写入ES的参数控制
低版本的是通过flush_size和idle_flush_time参数控制的,5.0之后还会受patch_size这个参数的控制,因为filter-output线程跟input线程分开了,这个参数是filter-output线程批量接收input队列消息数的数目,相当于每个filter-output线程控制批量写入ES,跟ES有一个连接池,而不是只有一个连接。
template:logstash-*
默认是一个上面的名字的模板,符合上面的命名的话都会应用这个模板。不起这样的名字就不会用到这个模板。
{ "order": 0, "version": 60001, "index_patterns": [ "logstash-*" ], "settings": { "index": { "refresh_interval": "5s" } }, "mappings": { "_default_": { "dynamic_templates": [ { "message_field": { "path_match": "message", "match_mapping_type": "string", "mapping": { "type": "text", "norms": false } } }, { "string_fields": { "match": "*", "match_mapping_type": "string", "mapping": { "type": "text", "norms": false, "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } } ], "properties": { "@timestamp": { "type": "date" }, "@version": { "type": "keyword" }, "geoip": { "dynamic": true, "properties": { "ip": { "type": "ip" }, "location": { "type": "geo_point" }, "latitude": { "type": "half_float" }, "longitude": { "type": "half_float" } } } } } }, "aliases": {} }
https://www.cnblogs.com/yb38156/p/13054004.html
input,queue,pipline运行的线程模型
简述:
(1)每个Input启动一个线程,从对应数据源获取数据
(2)Input会将数据写入一个队列:默认为内存中的有界队列(意外停止会导致数据丢失)。为了防止数丢失Logstash提供了两个特性:
Persistent Queues:通过磁盘上的queue来防止数据丢失。默认使用的是memory,需要手动调整为persisted
Dead Letter Queues:保存无法处理的event(仅支持Elasticsearch作为输出源) 。默认不开启,需要手动开启
上面的配置都是在logstash.yml文件中。
(3)Logstash会有多个pipeline worker, 每一个pipeline worker会从队列中取一批数据,然后执行filter和output(worker数目及每次处理的数据量由配置确定)
Persistent Queue说明
https://www.elastic.co/guide/en/logstash/current/persistent-queues.html
提供at-least-once保证消息不丢失,但不保证不重复
局限性:tcp,udp,ZeroMQ,以及没有ack机制的input类型,无法保证数据不丢失;持久化硬盘损坏,故障情况无法处理