{ "message" => "192.168.11.186,192.168.11.187 48391,3306 Dec 7, 2016 13:26:25.134545378 SELECT \x0a r.trx_id waiting_trx_id,\x0a r.trx_mysql_thread_id waiting_thread,\x0a r.trx_query waiting_query,\x0a b.trx_id blocking_trx_id,\x0a b.trx_mysql_thread_id blocking_thread,\x0a b.trx_query blocking_query\x0aFROM\x0a information_schema.innodb_lock_waits w\x0a INNER JOIN\x0a information_schema.innodb_trx b ON b.trx_id = w.blocking_trx_id\x0a INNER JOIN\x0a information_schema.innodb_trx r ON r.trx_id = w.requesting_trx_id", "@version" => "1", "@timestamp" => "2016-12-07T05:26:26.724Z", "path" => "/data01/audit/20161207_192.168.11.187.txt", "host" => "Vsftp", "type" => "audit-database-192.168.11.187", "clientip" => "192.168.11.186", "serverip" => "192.168.11.187", "client_port" => "48391", "server_port" => "3306", "time" => "Dec 7, 2016 13:26:25.134545378", "running_sql" => "SELECT r.trx_id waiting_trx_id, r.trx_mysql_thread_id waiting_thread, r.trx_query waiting_query, b.trx_id blocking_trx_id, b.trx_mysql_thread_id blocking_thread, b.trx_query blocking_query FROM information_schema.innodb_lock_waits w INNER JOIN information_schema.innodb_trx b ON b.trx_id = w.blocking_trx_id INNER JOIN information_schema.innodb_trx r ON r.trx_id = w.requesting_trx_id" } { "message" => "192.168.11.186,192.168.11.187 52481,3306 Dec 7, 2016 13:28:02.753832471 SELECT NOW(), (UNIX_TIMESTAMP(NOW()) - UNIX_TIMESTAMP(a.trx_started)) diff_sec, b.id, b.user, b.host, b.db FROM information_schema.innodb_trx a INNER JOIN information_schema.PROCESSLIST b ON a.TRX_MYSQL_THREAD_ID = b.id", "@version" => "1", "@timestamp" => "2016-12-07T05:28:03.459Z", "path" => "/data01/audit/20161207_192.168.11.187.txt", "host" => "Vsftp", "type" => "audit-database-192.168.11.187", "clientip" => "192.168.11.186", "serverip" => "192.168.11.187", "client_port" => "52481", "server_port" => "3306", "time" => "Dec 7, 2016 13:28:02.753832471", "running_sql" => "SELECT NOW(), (UNIX_TIMESTAMP(NOW()) - UNIX_TIMESTAMP(a.trx_started)) diff_sec, b.id, b.user, b.host, b.db FROM information_schema.innodb_trx a INNER JOIN information_schema.PROCESSLIST b ON a.TRX_MYSQL_THREAD_ID = b.id" } [elk@Vsftp audit]$ cat logstash-audit.conf input { file { type => "audit-database-192.168.11.187" path => ["/data01/audit/*_192.168.11.187.txt"] } } filter { grok { match => [ "message" ,"(?m)%{IPORHOST:clientip},%{IPORHOST:serverip}s+(?<client_port>S+),(?<server_port>S+)s+(?<time>(S+s+).*?[0-9]{2}:[0-9]{2}:[0-9]{2}.d+)s+(?<running_sql>(S+s+).*)" ] } mutate { gsub =>["message","\x0a"," "] gsub =>["running_sql","\x0a"," "] } } output { if [type] == "audit-database-192.168.11.187" { redis { host => "192.168.11.185" data_type => "list" key => "audit-database-192.168.11.187:redis" port=>"6379" password => "1234567" } } }