装logstash(使用yum安装)
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
添加此文件和内容
vim /etc/yum.repos.d/logstash.repo
[logstash-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
3.安装logstash
4.程序启动文件位置
/usr/share/logstash/bin/logstash
5.建立软连接到环境变量位置,方便日后使用
ln -s /usr/share/logstash/bin/logstash /usr/local/bin/logstash
1.创建文件夹 推荐这种方式安装
mkdir /usr/local/logstash
cd /usr/local/logstash
2.下载包 https://www.elastic.co/cn/downloads/past-releases 包下载地址 下载logstash-oss版本
wget https://artifacts.elastic.co/downloads/logstash/logstash-oss-7.7.0.tar.gz
3.解压
tar -zxvf logstash-oss-7.7.0.tar.gz
4.配置环境变量
vim /etc/profile source /etc/profile
export PATH=$PATH:/usr/local/logstash/logstash-7.7.0/bin
5.修改配置文件
vim /usr/local/logstash/logstash-7.7.0/config/logstash.yml
node.name # 节点名称 方便识别
path.data # 持久化存储数据的文件夹,默认是logstash home目录下的data
path.config # 设定pipeline配置文件目录
path.log # 设定pipeline日志文件目录
pipeline.workers # 设定pipeline的线程数
pipeline.batch.size/delay # 设定批量处理数据的数目和延迟
queue.type # 设定队列类型默认是memory 改成queue.type:persisted
queue.max_dytes # 队列总容量 默认是1024mb 改成queue.max_dytes:4096mb
6.简单使用
执行如下命令,进行快速测试(标准输入–> 标准输出)
logstash -e 'input {stdin{}} output {stdout{}}'
7.logstash常用参数
-e :指定logstash的配置信息,可以用于快速测试;
-f :指定logstash的配置文件;可以用于生产环境;
logstash 多实例运行方式
logstash --path.setting instance1 # instance1是cp /usr/local/logstash/logstash-7.7.0/config /usr/local/logstash/logstash-7.7.0/instance1 打开instance1/logstash.yml 修改node.name和path.data和path.config 保证不一样
logstash --path.setting instance2
不同的instance中修改longstash.yml,自定义path.data 确保其不同即可
8.一个实例多个配置文件
logstash -f /usr/local/logstash/config/conf -r # /usr/local/logstash/config/conf:是放置配置文件的路径,启动时候制定路径就可以
我们可以把多个配置文件放到/usr/local/logstash/config/conf路径中
cp ck-logstash.conf /usr/local/logstash/config/conf
cp ck-logstash_yun.conf /usr/local/logstash/config/conf
pipeline配置语法
1.数据类型
boolean
- isFailed => true
数值型
- port => 33
字符串
- name => "hello world"
数组
path => ["/root", "/home"]
哈希
match => {
'field1' => "value1"
'field2' => "value2"
}
注释
# this is a comment # 表示注释
2.在配置中引用logstash event的属性
直接应用
if[request] # 使用[]
字符串sprintf方式
"request is %{request}" # 使用 %{}
3.if
if true{
.....
}else if true{
.....
}else{
.....
}
4.表达式
==, !=, <, >, <=, >= # 比较
=~, !~, # 正则是否匹配
in, not in # 包含或者不包含
and, or , nand, xor, ! # 布尔操作符
{} # 分组操作符
5.imput plugin
input 插件是指定数据输入源,-个pileline可以有多个input插件
stdin # 标准标准输入,就是在命令行上输入
Codec # 类型为codec
type # 类型为string,自定义改事情类型,可用于后续判断
tags # 类型为array,自定义该事件tag,可用于后续判断
add_field # 类型为hash,为该事件添加字段
file
path # 类型是数组,指明读取文件的路径基于glob匹配 path => ["/var/log/**/*.log"]
exclue # 类型为数组排除不行监听的文件规则,基于glob匹配语法 exclue => "*.gz"
sincedb_path # 类型为字符串,记录sincedb文件路径
start_postion # 类型为字符串,beginning or end,是否从头读取文件beginning:从头开始读 end:当前开启的时候开始读
start_position => "beginning" # 从头读取,记录会存在sincedb中: 下次启动也以前读过的数据不会在重复读取
可以删除sincedb文件 让重启的时候从头开始读取
find / -name .sincedb_* 查找 然后删除查找出来的文件
/usr/local/logstash/logstash-7.7.0/data/plugins/inputs/file/.sincedb_32d545544e69851bf16f654e4bdeec4a
/usr/local/logstash/logstash-7.7.0/data/plugins/inputs/file/.sincedb_6186b45d486441e4a13c8fc65c76d586
sincedb_path => "/dev/null" # 配合调试使用 加上 start_position => "beginning" 每次重启读会从头读取一遍
start_interval # 类型为数值,单位是秒,定时检查文件是否有更新,默认是1秒
discover_interval # 类型为数值,单位是秒,定时检查是否有新文件呆读取,默认是16秒
ignore_older # 类型是数组,单位是秒,扫描文件列表,如果该文件上次更改时间超过设定时长,则不做处理,单依然会健康是否有新内容,默认关闭
close_older # 类型是位置,单位是秒,如果监听的文件在超过该设定的时候没有新的内容.会被关闭文件句柄,释放资源,单依然会监控释放有新内容,默认是3600秒,即一个小时
glob 匹配语法
* # 匹配任意字符,
** # 递归匹配之目录 /var/log/**/*.log 匹配/var/log目录及纸目录下的所有log文件
? # 匹配单一字符
[] # 匹配多个字符 [a-z]
{} # 匹配多个单纯 {"a", "b", "b"}
# 转义字符
kafka
kafka是最流行的消息队列,也是elastic stack架构中常用的,使用相对简单
input{
kafka{
zk_connect => "kafka:2181" # 连接地址
group_id => "logstash"
topic_id = > "apache_logs"
consumer_threads => 16 # 线程
}
}
6.codec plugin
codec plugin 作用于input和output plugin,负责将数据在原始与logstash event之间转换,创建的codec有
plain # 读取原始内容
dots # 将内容简化为点进行输出
rubydedug # 将logstash events 安装ruby格式输出,方便调试
line # 处理带换行符的内容
json # 处理json格式数据
nultiline # 处理多行数据内容
7.filter plugin
filter是logstash功能强大的主要原因,它可以对logstash event进行丰富的处理,比如解析数据,删除字段,类型转换等等,常见有如下几个
data 日期解析
grok 正则匹配解析
dissect 分隔符解析
mutate 对字段处理,比如重命名,删除,替换
json 安装json解析字段内容到制定字段中
geoip 增加地理位置数据
ruby 利用ruby代码来动态修改logstash event
8.Output plugin
负责将logstash event输出,创建的创建如下
stdout # 标准输出
file # 输出到文件
elasticsearch
9.建设阶段建议大家使用如下配置
http 作为input,方便输入测试数据,并且可以结合reload特征(stdin无法reload)
stdout做output,codec使用rubydebug,及时查看解析结果
测试错误输入情况下输出,以便对错误情况进行处理
input{http{port=>7474}}
filter{}
output{stdout{codec=>rubydebug}}