一、介绍
Logstash是一个开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到最喜欢的存储库中(我们的存储库当然是ElasticSearch)
Logstash是充当数据处理的需求的,当我们的数据需要处理的时候,会将它发送到Logstash进行处理,否则直接送到ElasticSearch中
二、用途
Logstash可以处理各种各样的输入,从文档,图表中=,数据库中,然后处理完后,发送到
三、Logstash部署
Logstash主要是将数据源的数据进行一行一行的处理,同时还直接过滤切割等功能。
3.1、下载
# node1是新开的一台机器,filebeat、metricbeat、nginx、redis、tomcat都是部署在这里的
[root@node1 app]# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.5.4.tar.gz
3.2、部署
[root@node1 app]# tar -zxvf logstash-6.5.4.tar.gz
[root@node1 app]# mv logstash-6.5.4 logstash && cd logstash
#第一个logstash示例,需要一段时间才能起来
[root@node1 logstash]# bin/logstash -e 'input { stdin { } } output { stdout {} }'
666
{
"@timestamp" => 2020-12-06T13:50:02.537Z,
"@version" => "1",
"message" => "666",
"host" => "node1"
}
四、配置详解
Logstash的配置有三部分,如下所示
input { #输入
stdin { ... } #标准输入
}
filter { #过滤,对数据进行分割、截取等处理
...
}
output { #输出
stdout { ... } #标准输出
}
4.1、输入
- 采集各种样式、大小和来源的数据,数据往往以各种各样的形式,或分散或集中地存在于很多系统中。
- Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。
4.2、过滤
- 实时解析和转换数据
- 数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。
4.3、输出
Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。
五、读取自定义日志
前面我们通过Filebeat读取了nginx的日志,如果是自定义结构的日志,就需要读取处理后才能使用,所以,这个时候就需要使用Logstash了,因为Logstash有着强大的处理能力,可以应对各种各样的场景。
日志结构
2019-03-15 21:21:21|ERROR|1 读取数据出错|参数:id=1002
可以看到,日志中的内容是使用“|”进行分割的,使用,我们在处理的时候,也需要对数据做分割处理。
编写配置文件
[root@node1 logstash]# vim mogublog-pipeline.conf
然后添加如下内容
input {
file {
path => "/app/test/logs/app.log"
start_position => "beginning"
}
}
filter {
mutate {
split => {"message"=>"|"}
}
}
output {
stdout { codec => rubydebug }
}
启动
#启动
[root@node1 logstash]# ./bin/logstash -f ./mogublog-pipeline.conf
然后我们就插入我们的测试数据
[root@node1 logs]# echo "2020-12-06 21:21:21|ERROR|读取数据出错|参数:id=1001" >> app.log
然后我们就可以看到logstash就会捕获到刚刚我们插入的数据,同时我们的数据也被分割了
{
"path" => "/app/test/logs/app.log",
"@timestamp" => 2020-12-06T14:08:42.041Z,
"host" => "node1",
"message" => [
[0] "2020-12-06 21:21:21",
[1] "ERROR",
[2] "读取数据出错",
[3] "参数:id=1001"
],
"@version" => "1"
}
输出到Elasticsearch
我们可以修改我们的配置文件,将我们的日志记录输出到ElasticSearch中
input {
file {
path => "/soft/beats/logs/app.log"
start_position => "beginning"
}
}
filter {
mutate {
split => {"message"=>"|"}
}
}
output {
elasticsearch {
hosts => ["192.168.1.111:9200","192.168.1.112:9200","192.168.1.113:9200"]
}
}
然后在重启我们的logstash
[root@node1 logstash]# ./bin/logstash -f ./mogublog-pipeline.conf
然后向日志记录中,插入两条数据
[root@node1 logs]# echo "2020-12-06 21:21:21|ERROR|读取数据出错|参数:id=1001" >> app.log
[root@node1 logs]# echo "2020-12-06 21:21:21|ERROR|读取数据出错|参数:id=1002" >> app.log
最后就能够在页面看到我们刚刚插入的数据了
{
"_index": "logstash-2020.12.06",
"_type": "doc",
"_id": "TMJpOHYBiDbkYSn5kqnq",
"_version": 1,
"_score": 1,
"_source": {
"path": "/app/test/logs/app.log",
"@version": "1",
"host": "node1",
"@timestamp": "2020-12-06T14:16:50.197Z",
"message": [
"2020-12-06 21:21:21"
,
"ERROR"
,
"读取数据出错"
,
"参数:id=1004"
]
}
}