Json filter
[elk@db01 0204]$ cat json_filter.conf
input {
stdin {}
}
filter {
json {
source=>"message"
}
}
output {
stdout {codec=>rubydebug}
}
[elk@db01 0204]$ logstash -f json_filter.conf
Settings: Default pipeline workers: 4
Pipeline main started
{"name":"xx","age":23}
{
"message" => "{"name":"xx","age":23}",
"@version" => "1",
"@timestamp" => "2017-01-17T23:22:08.581Z",
"host" => "db01",
"name" => "xx",
"age" => 23
}
加上target呢?
[elk@db01 0204]$ cat json_filter.conf
input {
stdin {}
}
filter {
json {
source=>"message"
target=>"scan"
}
}
output {
stdout {codec=>rubydebug}
}
[elk@db01 0204]$ logstash -f json_filter.conf
Settings: Default pipeline workers: 4
Pipeline main started
{"name":"xx","age":23}
{
"message" => "{"name":"xx","age":23}",
"@version" => "1",
"@timestamp" => "2017-01-17T23:25:07.111Z",
"host" => "db01",
"scan" => {
"name" => "xx",
"age" => 23
}
}
grok 是目前logstash 里最好的一种解析各种非结构化的日志数据的工具
%{IP:ip}
ip小写是自定义的
grok
break_on_match 默认true 第一次匹配成功,就不会匹配剩下的patterns
[elk@db01 0204]$ cat nginx.conf
input {
file {
path => ["/home/elk/0204/nginx.log"]
type => "nginx"
start_position => "beginning"
}
}
filter {
grok {
match => {"message"=>"%{IP:ip} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"}
}
}
output {
stdout {codec=>rubydebug}
}
[elk@db01 0204]$ cat nginx.log
9.9.8.6 GET /xx.hmtl 343 44
input {
file {
path => ["/home/elk/0204/nginx.log"]
type => "nginx"
start_position => "beginning"
}
}
filter {
grok {
match => {"message"=>"%{IPORHOST:clientip}s+%{WORD:method}s+%{URIPATHPARAM:request}s+%{NUMBER:bytes}s+%{NUMBER:duration}"}
}
}
output {
stdout {codec=>rubydebug}
}
[elk@db01 0204]$ logstash -f nginx.conf
Settings: Default pipeline workers: 4
Pipeline main started
{
"message" => "9.9.8.6 GET /xx.hmtl 343 44",
"@version" => "1",
"@timestamp" => "2017-01-18T00:12:37.490Z",
"path" => "/home/elk/0204/nginx.log",
"host" => "db01",
"type" => "nginx",
"clientip" => "9.9.8.6",
"method" => "GET",
"request" => "/xx.hmtl",
"bytes" => "343",
"duration" => "44"
}
删除message字段:
[elk@db01 0204]$ cat nginx.conf
input {
file {
path => ["/home/elk/0204/nginx.log"]
type => "nginx"
start_position => "beginning"
}
}
filter {
grok {
match => {"message"=>"%{IPORHOST:clientip}s+%{WORD:method}s+%{URIPATHPARAM:request}s+%{NUMBER:bytes}s+%{NUMBER:duration}"}
remove_field =>["message"]
}
}
output {
stdout {codec=>rubydebug}
}
[elk@db01 0204]$ logstash -f nginx.conf
Settings: Default pipeline workers: 4
Pipeline main started
{
"@version" => "1",
"@timestamp" => "2017-01-18T00:15:03.879Z",
"path" => "/home/elk/0204/nginx.log",
"host" => "db01",
"type" => "nginx",
"clientip" => "55.9.3.6",
"method" => "GET",
"request" => "/zz.xml",
"bytes" => "3",
"duration" => "44"
}
kv 插件
field_split 定义分割符
[elk@db01 0204]$ cat kv.conf
input {
stdin {}
}
filter {
kv {
field_split=>"&?"
}
}
output {
stdout {
codec=>rubydebug
}
}
[elk@db01 0204]$ logstash -f kv.conf
Settings: Default pipeline workers: 4
Pipeline main started
https://www.baidu.com/s?wd=奥巴马&rsv_spt=1&rsv_iqid=0x90dd7e610001f239&issp=1&f=3&rsv_bp=1&rsv_idx=2&ie=utf-8&tn=
baiduhome_pg&rsv_enter=0&oq=奥巴马&rsv_t=b39atb4WgjYrHvo4SnLlmez2VMymtEWBoQPRTiUrWZcluDRfAVZ5R%2F%2FFyzJ2KKaX
FMIv&rsv_pq=b374731e0000037a&prefixsug=奥巴马&rsp=0
{
"message" => "https://www.baidu.com/s?wd=奥巴马&rsv_spt=1&rsv_iqid=0x90dd7e610001f239&issp=1&f=3&rsv_bp=1&rsv_idx=2&ie=utf-8&tn=",
"@version" => "1",
"@timestamp" => "2017-01-18T00:25:06.444Z",
"host" => "db01",
"wd" => "奥巴马",
"rsv_spt" => "1",
"rsv_iqid" => "0x90dd7e610001f239",
"issp" => "1",
"f" => "3",
"rsv_bp" => "1",
"rsv_idx" => "2",
"ie" => "utf-8"
}