zoukankan      html  css  js  c++  java
  • 使用logstash从Kafka中拉取数据并传输给elasticsearch且创建相应索引的操作

    注意事项:默认Kafka传递给elastci的数据是在'data'字段,且不包含其他数据,所以需要使用额外的操作进行处理

    logstash配置文件操作

    input {
    
      kafka {
        bootstrap_servers => "172.17.107.187:9092,172.17.107.187:9093,172.17.107.187:9094"  # 字符串形式,kafka集群地址
        auto_offset_reset => "latest" # 拉取最近数据
        consumer_threads => 5 # 使用的线程数
        decorate_events => true   # 传递给elastci的数据增加附加数据
        topics => ["test_canal_topic"] # 拉取的kafka的指定topic
        tags => ["canal"] # 标签,额外使用该参数可以在elastci中创建不同索引
      }
      
    }
    
    
    filter {
      # 把默认的data字段重命名为message字段,方便在elastic中显示
      mutate {
        rename => ["data", "message"]
      }
      
      # 还可以使用其他的处理方式,在此就不再列出来了
    }
    
    output {
    
      elasticsearch {
        hosts => ["http://172.17.107.187:9203", "http://172.17.107.187:9201","http://172.17.107.187:9202"]
        index => "filebeat_%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}" # decorate_events=true的作用,可以使用metadata中的数据
        user => "elastic"
        password => "escluter123456"
      }
    
    }
    
    
  • 相关阅读:
    程序员修炼之道阅读笔记1
    构建之法阅读笔记6
    构建之法阅读笔记5
    构建之法阅读笔记4
    构建之法阅读笔记3
    构建之法阅读笔记2
    关于搜狗输入法的评价
    找水王
    用户场景
    个人工作总结
  • 原文地址:https://www.cnblogs.com/sanduzxcvbnm/p/11634352.html
Copyright © 2011-2022 走看看