filebeat配置列表
filebeat搜集的日志输出到redis
#prospectors config
filebeat.prospectors:
- input_type: log
paths:
- /opt/logs/PROD_XTZJ_BPMS-API_1721913167_10000/1.log
encoding: plain
document_type: bpms
multiline.pattern: ^[0-9]
multiline.negate: true
multiline.match: after
#global config
filebeat.registry_file: ${path.data}/registry-bpms
#output.redis config
output.redis:
hosts: ["xxx.xxx.xxx.xxx:port", "xxx.xxx.xxx.xxx:port", "xxx.xxx.xxx.xxx:port"]
key: filebeat-java
datatype: list
loadbalance: true
elasticsearch配置文件
elasticsearch.yml
cluster.name: xxx node.name: node-2 bootstrap.memory_lock: true network.host: xxx.xxx.xxx.xxx http.port: 9200 transport.tcp.port: 9300 discovery.zen.ping.unicast.hosts: ["xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx","xxx.xxx.xxx.xxx"] discovery.zen.minimum_master_nodes: 2 http.cors.enabled: true http.cors.allow-origin: "*"
logstash配置文件
filebaet-java-to-es.conf
input {
redis {
data_type => "list" #value type is STRING
key => "filebeat-java" #value type is STRING
host => "xxx.xxx.xxx.xxx" #value type is STRING
port => 6379 #value type is NUMBER,Default value is 6379
}
redis {
data_type => "list"
key => "filebeat-java"
host => "xxx.xxx.xxx.xxx"
port => 6379
}
redis {
data_type => "list"
key => "filebeat-java"
host => "xxx.xxx.xxx.xxx"
port => 6379
}
}
filter {
if [type] == "pre_qcredit" {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp}s+[%{GREEDYDATA:thread}]s+%{DATA:level}s+%{DATA:class}s+"
}
}
}else if [type] == "prod_qkpie" {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp}s+[%{GREEDYDATA:thread}]s+%{DATA:level}s+%{DATA:class}s+"
}
}
}else if [type] in ["prod_repayment-base", "prod_repayment-api"] {
grok {
match => {
"message" => "[%{LOGLEVEL:Level}?s*]s+%{TIMESTAMP_ISO8601:timestamp}s+--%{DATA:thread}--s+[%{DATA:logger}]s+%{GREEDYDATA:logInfo}"
}
}
}else if [type] in ["filter_bpms_platform", "filter_bpms_api", "filter_bpms_monitor", "filter_brms_api", "filter_prod_ndes", "filter_tsp", "filter_data_pretreatment", "filter_pboc_service", "filter_pboc_task"] {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp}s+[%{DATA:thread}]s+%{DATA:level}s+%{DATA:class}s+-s+[%{DATA:bizId}]%{DATA:sourceSystem},%{DATA:targetSystem},%{DATA:interface},%{DATA:isSuccess},%{DATA:timeUse},%{GREEDYDATA:errormessage}"
}
}
mutate {
convert => { "timeUse" => "integer" }
}
} else {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp}s+%{DATA:thread}s+%{DATA:level}s+%{DATA:class}s+"
}
}
}
#用日志输出时间替换掉ES的@timestamp
date {
match => ["timestamp", "ISO8601"]
target => "@timestamp"
}
}
output {
elasticsearch {
hosts => ["xxx.xxx.xxx.xxx:9200", "xxx.xxx.xxx.xxx:9200", "xxx.xxx.xxx.xxx:9200"] #value type is ARRAY
index => "%{type}-%{+YYYY.MM.dd}" #YYYY.MM.dd get from @timestamp field
template_name => "logstash2"
pool_max_per_route => 300
flush_size => 2000 #value type is NUMBER,Default value is 500
idle_flush_time => 5 #value type is NUMBER,Default value is 1
}
}