zoukankan      html  css  js  c++  java
  • canal+kafka+logstash+es 架构 logstash的配置

    网上搜了很多资料都没有看到配置  刚接触logstash  写了一个配置  分享给 刚入手的小白们 少些烦恼 只针对 canal 产生的数据 进行的过滤 如果是其他方式的 可以参考处理

    input {
        kafka{
                bootstrap_servers => ["127.0.0.1:9092"]
                #group_id => "goodsoptiones"
                 # 拉取最近数据
                 auto_offset_reset => "latest"
                 # boolean (optional), default: false
                 #logstash 启动后从什么位置开始读取数据,默认是结束位置,也就是说 logstash 进程会以从上次读取结束时的偏移量开始继续读取,如果之前没有消费过,那么就开始从头读取.如果你是要导入原有数据,把这个设定改成 "true"
                 #reset_beginning => true
                 # 使用的线程数
                 #consumer_threads => 5
                topics => ["test"]
                codec => json {charset => "UTF-8"}
            }
    }
    
    filter{
       if [type]=="ALTER"{ #过滤掉修改结构的log
          drop{}

        } split{ field
    => "data" } ruby{ code => " event.get('data').each {|k, v| event.set(k, v) } event.remove('data') " } translate{ field => "type" destination => "op_type" dictionary =>[ "INSERT","index", "UPDATE","update", "DELETE","delete" ] } mutate{ remove_field =>["database","pkNames","mysqlType","table","sql","sqlType","old","es","isDdl","ts","type"] } } output { #file { # path => "D:/zip/logstash-7.8.0/logs/kafkalog.log" # flush_interval => 0 #} stdout { codec => rubydebug } elasticsearch { hosts => ["http://localhost:9200"] index => "goodsoption" document_type => "_doc" document_id => "%{id}" #document_id => "142800" action=>"%{op_type}" #action=>"update" #user => "elastic" #password => "changeme" } }
  • 相关阅读:
    创业团队的狼性管理:慈不掌兵 义不聚财
    爱德华·斯诺登
    塞班岛
    大兴雷克萨斯深度剖析2013款LS460L_深圳大兴雷克萨斯_太平洋汽车网
    金字塔底端_百度百科
    左立_百度百科
    志大才疏_百度百科
    JS~json日期格式化
    MVVM架构~knockoutjs系列之从Knockout.Validation.js源码中学习它的用法
    JS~重写alter与confirm,让它们变成fancybox风格
  • 原文地址:https://www.cnblogs.com/zhaoxueru/p/13710709.html
Copyright © 2011-2022 走看看