zoukankan      html  css  js  c++  java
  • logstash收集日志并写入kafka再到es集群

    条件:
    有kafka环境

    图形架构:

    环境准备

    172.31.2.101 es1 + kibana
    172.31.2.102 es2
    172.31.2.103 es3
    172.31.2.104 logstash1
    172.31.2.105 logstash2
    172.31.2.41 zookeeper + kafka
    172.31.2.42 zookeeper + kafka
    172.31.2.43 zookeeper + kafka
    172.31.2.107 web1
    

    先启动zookeeper

    [root@mq1 ~]# /usr/local/zookeeper/bin/zkServer.sh restart
    [root@mq2 ~]# /usr/local/zookeeper/bin/zkServer.sh restart
    [root@mq3 ~]# /usr/local/zookeeper/bin/zkServer.sh restart
    

    启动kafka

    [root@mq1 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties
    
    [root@mq2 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties
    
    [root@mq3 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties
    

    查看端口

    [root@mq1 ~]# ss -tanl | grep 9092
    
    LISTEN  0        50          [::ffff:172.31.2.41]:9092                 *:*
    

    web服务器改配置写入到kafka

    [root@es-web1 ~]# cat /etc/logstash/conf.d/kafka-nginx-es.conf
    
    input {
      file {
        path => "/var/log/nginx/access.log"
        start_position => "beginning"
        stat_interval => 3
        type => "nginx-accesslog"
        codec => "json"
     }
    
     file {
      path => "/apps/nginx/logs/error.log"
      start_position => "beginning"
      stat_interval => 3
      type => "nginx-errorlog"
      }
    }
    
    output {
      if [type] == "nginx-accesslog" {
        kafka {
          bootstrap_servers => "172.31.2.41:9092"
          topic_id => "long-linux21-accesslog"
          codec => "json"
      }}
    
      if [type] == "nginx-errorlog" {
        kafka {
          bootstrap_servers => "172.31.2.41:9092"
          topic_id => "long-linux21-errorlog"
          #codec => "json"
      }}
    }
    

    重启

    root@long:~# systemctl restart logstash
    

    在logstash服务器配置写入elasticsearch

    [root@logstash1 ~]# cat /etc/logstash/conf.d/kafka-to-es.conf
    
    input {
      kafka {
        bootstrap_servers => "172.31.2.41:9092,172.31.2.42:9092,172.31.2.43:9092"
        topics => "long-linux21-accesslog"
        codec => "json"
     }
    
      kafka {
        bootstrap_servers => "172.31.2.41:9092,172.31.2.42:9092,172.31.2.43:9092"
        topics => "long-linux21-errorlog"
        codec => "json"
      }
    }
    
    output {
      if [type] == "nginx-accesslog" {
        elasticsearch {
          hosts => ["172.31.2.101:9200"]
          index => "n19-long-kafka-nginx-accesslog-%{+YYYY.MM.dd}"
      }}
    
      if [type] == "nginx-errorlog" {
        elasticsearch {
          hosts => ["172.31.2.101:9200"]
          index => "n17-long-kafka-nginx-errorlog-%{+YYYY.MM.dd}"
      }}
    }
    

    测试

    [root@logstash1 ~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka-to-es.conf -t
    

    重启

    root@long:~# systemctl restart logstash
    

    kafak工具

    写入kibana

  • 相关阅读:
    Eclipse快捷键大全(转载)
    IE9浏览Flash页面时显示错位并不停地闪烁
    flash全屏事件和键盘按下事件部分不能触发问题
    AS3摘要(转载)
    【as3手册小记】ActionScript 中处理全屏模式的注意事项
    巧用FlashPaper 让Word文档变Flash
    AS3视频照相截图(转载)
    Json串到json对象的转换
    映射文件详解(转)
    Jquery .ajax方法分析(一)
  • 原文地址:https://www.cnblogs.com/xuanlv-0413/p/15374798.html
Copyright © 2011-2022 走看看