[elk@dr-mysql01 mysql]$ cat logstash_mysql.conf input { file { type => "zj_mysql" path => ["/data01/applog_backup/zjzc_log/zj-mysql01-slowlog.*"] codec => multiline { pattern => "^s+#s+User@Host:" negate => true what => "previous" } } file { type => "wj_mysql" path => ["/data01/applog_backup/winfae_log/wj-mysql01-slowlog.*"] codec => multiline { pattern => "^s+#s+User@Host:" negate => true what => "previous" } } } filter { # drop sleep events grok { match => { "message" => "SELECT SLEEP" } add_tag => [ "sleep_drop" ] tag_on_failure => [] # prevent default _grokparsefailure tag on real records } if "sleep_drop" in [tags] { drop {} } grok { match => [ "message","(?m)s*# User@Host:s+S+[%{USER:user}]s+@s+[%{IP:clientip}]s+(?<id>(S+s+)*S+)s*#s+Query_time:s+%{NUMBER:Query_time}s+Lock_time: %{NUMBER:lock_time}s+Rows_sent: %{NUMBER:rows_sent}s+Rows_examined: %{NUMBER:rows_examined}s* s*SETs+timestamp=%{NUMBER:timestamp};s*(?<query>(s*S+s*).*)s*" ] } date { match => [ "timestamp", "UNIX" ] remove_field => [ "timestamp" ] } } output { if [type] == "zj_mysql" { redis { host => "192.168.32.67" data_type => "list" key => "zj_mysql:redis" port=>"6379" password => "1234567" } } else if [type] == "wj_mysql"{ redis { host => "192.168.32.67" data_type => "list" key => "wj_mysql:redis" port=>"6379" password => "1234567" } } }