Logstash 设计了自己的 DSL —— 有点像 Puppet 的 DSL,或许因为都是用 Ruby 语言写的吧 —— 包括有区域,注释,数据类型(布尔值,字符串,数值,数组,哈希),条件判断,字段引用等。
一、基本语法组成
logstash.conf配置文件里至少需要有input和output两个部分构成
input {
#输入
}
filter {
#过滤匹配
}
output {
#输出
}
1、input配置
1.1、file{}(文件读取)
监听文件变化,记录一个叫 .sincedb 的数据库文件来跟踪被监听的日志文件的当前读取位(也就是时间戳)
input {
file {
path => ["/var/log/access.log", "/var/log/message"] #监听文件路径
type => "system_log" #定义事件类型
start_position => "beginning" #检查时间戳
}
}
参数说明:
exclude :排除掉不想被监听的文件
stat_interval :logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒。
start_position :logstash 默认是从结束位置开始读取文件数据,也就是说 logstash 进程会以类似 tail -f 的形式运行。如果你是要导入原有数据,把这个设定改成 “beginning”,logstash 进程就按时间戳记录的地方开始读取,如果没有时间戳则从头开始读取,有点类似cat,但是读到最后一行不会终止,而是继续变成 tail -f。
1.2、codec(定义编码类型)
优化建议:直接输入预定义好的 JSON 数据,这样就可以省略掉 filter/grok 配置,从而减轻过滤器 logstash 的 CPU 负载消耗;具体操作如下:
1.2.1、修改nginx配置文件,添加如下
logformat json '{"@timestamp":"$time_iso8601",'
'"@version":"1",'
'"host":"$server_addr",'
'"client":"$remote_addr",'
'"size":$body_bytes_sent,'
'"responsetime":$request_time,' #$request_time没有双引号表明该值为int类型
'"domain":"$host",'
'"url":"$uri",'
'"status":"$status"}';
access_log /var/log/nginx/access.log_json json;
1.2.2、重启 nginx 应用,然后修改input/file 区段配置成下面这样
input {
file {
path => "/var/log/nginx/access.log_json""
codec => "json"
}
}
2、filter过滤器配置
2.1、data(时间处理)
用来转换日志记录中的时间字符串,变成LogStash::Timestamp 对象,然后转存到 @timestamp 字段里。
注意:因为在稍后的 outputs/elasticsearch 中index常用的 %{+YYYY.MM.dd} 这种写法必须读取 @timestamp数据,所以一定不要直接删掉这个字段保留自己的时间字段,而是应该用 filters/date 转换后删除自己的字段!至于elasticsearch 中index使用 %{+YYYY.MM.dd}这种写法的原因后面会说明。
filter {
grok {
match => ["message", "%{HTTPDATE:logdate}"]
}
date {
match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
2.2、grok (正则匹配)
filter {
grok {
match => [ "message", "s+(?<status>d+?)s+" ] #跟python的正则有点差别
}
}
优化建议:如果把 “message” 里所有的信息都 grok 到不同的字段了,数据实质上就相当于是重复存储了两份。所以可以用 remove_field 参数来删除掉 message 字段,或者用 overwrite 参数来重写默认的 message 字段,只保留最重要的部分。
filter {
grok {
patterns_dir => "/path/to/your/own/patterns"
match => {
"message" => "%{SYSLOGBASE} %{DATA:message}"
}
overwrite => ["message"]
}
}
filter {
grok {
match => ["message", "%{HTTPDATE:logdate}"]
remove_field => ["logdate"]
}
}
2.3、GeoIP (地址查询归类)
GeoIP 是最常见的免费 IP 地址归类查询库,同时也有收费版可以采购。GeoIP 库可以根据 IP 地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用。
filter {
geoip {
source => "clientip"
database => "/etc/logstash/GeoLiteCity.dat" #需去官网下载ip库放到本地
}
}
filter {
geoip {
source => "message" #如果能联网可查询在线ip库
}
}
注:geoip 插件的 “source” 字段可以是任一处理后的字段,比如 “clientip”,但是字段内容却需要小心!geoip 库内只存有公共网络上的 IP 信息,查询不到结果的,会直接返回 null,而 logstash 的 geoip 插件对 null 结果的处理是:不生成对应的 geoip.字段。
所以在测试时,如果使用了诸如 127.0.0.1, 172.16.0.1, 182.168.0.1, 10.0.0.1 等内网地址,会发现没有对应输出!
GeoIP 库数据较多,如果不需要这么多内容,可以通过 fields 选项指定自己所需要的。下例为全部可选内容
filter {
geoip {
fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"]
}
}