zoukankan      html  css  js  c++  java
  • ELK同步kafka带有key的Message

    需求

    kafka中的message带有key,带有相同key值的message后入kafka的意味着更新message,message值为null则意味着删除message。
    用logstash来同步kafka中这样的数据入Elasticsearch,从而实现可以同步增删改数据。

    环境

    1) logstash-6.5.4 
    2) kafka中topic的message带有key
    

    解决(Logstash的配置如下)

    input {
        kafka {
            bootstrap_servers=> "192.168.31.92:9092"
            group_id => "test_group"
            topics => ["test_topic"]
            type => "test_type"
            auto_offset_reset => earliest
            consumer_threads => 1
            decorate_events => true
            codec => plain {
                format => ""
            }
        }
    }
    
    filter{
        if ([message] == "") {
            mutate {
                add_field => { "@esaction" => "delete"}
            }
    	mutate {
                remove_field => ["@version"]
            }
        } else {
            json {
                source => "message"
            }
    
            mutate {
                remove_field => ["@version","message"]
            }
    
            mutate {
                add_field => { "@esaction" => "index"}
            }
    
            date {
                match => ["updated","UNIX_MS"]
    	    target => "@timestamp"
            }
        }
    }
    
    output {
        elasticsearch {
            hosts => ["192.168.21.80:9200"]
            index => "test_index"
            document_id => "%{[@metadata][kafka][key]}"
            action => "%{[@esaction]}"
            codec => "json"
        }
    }
    
  • 相关阅读:
    k8s前期部署准备
    树莓派安装Gitlab-runner
    GitLab CI/CD 报错
    测试
    LVS结合keepalive
    LVS实现负载均衡安装配置详解
    LVS实现负载均衡原理
    私有仓库 gitlab 部署笔记
    Docker 案例: 在容器中部署静态网站
    docker 容器的启动方式
  • 原文地址:https://www.cnblogs.com/itwild/p/10316855.html
Copyright © 2011-2022 走看看