1、logstash6.5.3
配置收集mongodb的日志:
首先在mongodb服务器上部署filebeat收集日志并上传到logstash进行处理,然后上传到ES。
filebeat-conf:
- input_type: log
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /data/log/mongod.log
tags: ['db']
output.logstash:
hosts: ["10.1.1.12:5044"]
这里只给出主要的配置。
logstash-conf:
input {
beats {
port => "5044"
}
}
filter {
if 'db' in [tags] {
grok {
match => ["message","%{TIMESTAMP_ISO8601:timestamp}s+%{MONGO3_SEVERITY:severity}s+%{MONGO3_COMPONENT:component}%{SPACE}(?:[%{DATA:context}])?s+%{GREEDYDATA:body}"]
remove_field => [ "message" ]
remove_field => [ "beats_input_codec_plain_applied" ]
}
if [body] =~ "ms$" {
grok {
match => ["body","commands+%{DATA:collection}s+command:s+%{DATA:action}s+.*s+query:s+%{DATA:query}s+planSummary+.*s+%{NUMBER:spend_time:int}ms$"]
}
}
if [body] =~ "aggregate" {
grok {
match => ["body","commands+%{DATA:collection}s+command:s+%{DATA:action}s+.*s+pipeline:s+%{DATA:pipeline}s+keyUpdates+.*s+%{NUMBER:spend_time:int}ms"]
}
}
if [body] =~ "find" {
grok {
match => ["body","commands+%{DATA:collection}s+command:s+%{DATA:action}s+.*s+filter:s+%{DATA:filter}s+planSummary+.*s+%{NUMBER:spend_time:int}ms"]
}
}
date {
match => [ "timestamp", "ISO8601" ]
remove_field => [ "timestamp" ]
}
}
}
output {
if 'db' in [tags] {
elasticsearch {
hosts => "192.4.7.16:9200"
index => "logstash-mongodb-slow-%{+YYYY-MM-dd}"
}
}
}
grok需要先进行测试,kibana6.3以后提供了grok debugger:
测试效果: