zoukankan      html  css  js  c++  java
  • filebeat-2-通过kafka队列链接logstash

    filebeat 直接到logstash, 由于logstash的设计问题, 可能会出现阻塞问题, 因为中间使用消息队列分开

    可以使用redis, 或者kafka, 这儿使用的是kafka

    1, 安装

    kafka的安装, 解压可用, 但需要zookeeper, 内置了一个zookeeper, 直接使用即可

    1), 启动内置zookeeper

    ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &

    2), 修改kafka的配置文件

    vim ./conf/server.properties

    ############################# Server Basics #############################
    broker.id=0
    delete.topic.enable=true
     
    ############################# Socket Server Settings #############################
    listeners=PLAINTEXT://0.0.0.0:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
     
    ############################# Log Basics #############################
    log.dirs=/tmp/kafka-logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
     
    ############################# Log Flush Policy #############################
    log.flush.interval.messages=10000
    log.flush.interval.ms=1000
     
    ############################# Log Retention Policy #############################
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
     
    ############################# Zookeeper #############################
    zookeeper.connect=localhost:2181
    zookeeper.connection.timeout.ms=6000

    3), 启动kafkaserver

    /bin/kafka-server-start.sh ./config/server.properties &

    4),修改filebeat文件, 最终形态

    cat ./elk/filebeat-5.5.2-linux-x86_64/filebeat.yml | grep -v '#' | grep -v '^$'
    filebeat.prospectors:
    - input_type: log
      paths:
        - /var/log/nginx/*.log
      encoding: utf-8
      document_type: my-nginx-log
      scan_frequency: 5s
      harvester_buffer_size: 16384
      max_bytes: 10485760
      tail_files: true
    output.kafka:
      enabled: true
      hosts: ["www.wenbronk.com:9092"]
      topic: elk-%{[type]}
      worker: 2
      max_retries: 3
      bulk_max_size: 2048
      timeout: 30s
      broker_timeout: 10s
      channel_buffer_size: 256
      keep_alive: 60
      compression: gzip
      max_message_bytes: 1000000
      required_acks: 0
      client_id: beats

    5), 重新启动filebeat

    ./filebeat -c ./filebeat.yml &

    6), 修改 logstash的input

    input {
        kafka  {
          #codec => "json"
          topics_pattern => "elk-.*"
          bootstrap_servers => "127.0.0.1:9092"
          auto_offset_reset => "latest"
          group_id => "logstash-g1"
        }
    }
    output {
        elasticsearch {                                  #Logstash输出到elasticsearch;
          hosts => ["localhost:9200"]                    #elasticsearch为本地;
          index => "logstash-nginx-%{+YYYY.MM.dd}"       #创建索引;
          document_type => "nginx"                       #文档类型;
          workers => 1                                   #进程数量;
          user => elastic                                #elasticsearch的用户;
          password => changeme                           #elasticsearch的密码;
          flush_size => 20000
          idle_flush_time => 10
     }
    }

    7), 重启logstash

    8 ), 页面访问 nginx, 可以查看消息队列中的消息

    ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic elk-log -m-beginning

     参考: http://www.ywnds.com/?p=9776

  • 相关阅读:
    paip.关于动画特效原理 html js 框架总结
    paip.utf-8,unicode编码的本质输出unicode文件原理 python
    paip.多维理念 输入法的外码输入理论跟文字输出类型精髓
    paip.前端加载时间分析之道优化最佳实践
    paip.输入法编程--英文ati化By音标原理与中文atiEn处理流程 python 代码为例
    paip.导入数据英文音标到数据库mysql为空的问题之道解决原理
    paip.元数据驱动的转换-读取文件行到个list理念 uapi java php python总结
    paip.python3 的类使用跟python2 的不同之处
    paip.日志中文编码原理问题本质解决python
    paip.性能跟踪profile原理与架构与本质-- python扫带java php
  • 原文地址:https://www.cnblogs.com/wenbronk/p/7412141.html
Copyright © 2011-2022 走看看