网上搜了很多资料都没有看到配置 刚接触logstash 写了一个配置 分享给 刚入手的小白们 少些烦恼 只针对 canal 产生的数据 进行的过滤 如果是其他方式的 可以参考处理
input { kafka{ bootstrap_servers => ["127.0.0.1:9092"] #group_id => "goodsoptiones" # 拉取最近数据 auto_offset_reset => "latest" # boolean (optional), default: false #logstash 启动后从什么位置开始读取数据,默认是结束位置,也就是说 logstash 进程会以从上次读取结束时的偏移量开始继续读取,如果之前没有消费过,那么就开始从头读取.如果你是要导入原有数据,把这个设定改成 "true" #reset_beginning => true # 使用的线程数 #consumer_threads => 5 topics => ["test"] codec => json {charset => "UTF-8"} } } filter{ if [type]=="ALTER"{ #过滤掉修改结构的log
drop{}
} split{ field => "data" } ruby{ code => " event.get('data').each {|k, v| event.set(k, v) } event.remove('data') " } translate{ field => "type" destination => "op_type" dictionary =>[ "INSERT","index", "UPDATE","update", "DELETE","delete" ] } mutate{ remove_field =>["database","pkNames","mysqlType","table","sql","sqlType","old","es","isDdl","ts","type"] } } output { #file { # path => "D:/zip/logstash-7.8.0/logs/kafkalog.log" # flush_interval => 0 #} stdout { codec => rubydebug } elasticsearch { hosts => ["http://localhost:9200"] index => "goodsoption" document_type => "_doc" document_id => "%{id}" #document_id => "142800" action=>"%{op_type}" #action=>"update" #user => "elastic" #password => "changeme" } }