zoukankan      html  css  js  c++  java
  • 使用kafka作为缓存收集日志

    一、环境准备

    系统版本 主机名 IP地址 所运行的服务
    Centos 7.5 kafka01 192.168.1.1 zookeeper、kafka、ES、kibana
    Centos 7.5 kafka02 192.168.1.2 zookeeper、kafka、logstash
    Centos 7.5 kafka03 192.168.1.3 zookeeper、kafka、ES、nginx、filebeat

    由于电脑性能较低,所以就不开那么多机器了!

    二、实现kafka作为缓存收集日志信息

    2.1 安装zookeeper

    $ echo -e "192.168.1.1 kafka01
    192.168.1.2 kafka02
    192.168.1.3 kafka03" >> /etc/hosts
    $ wget https://downloads.apache.org/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
    $ tar zxf zookeeper-3.4.14.tar.gz -C /opt/
    $ ln -s /opt/zookeeper-3.4.14/ /opt/zookeeper
    $ cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg 
    $ mkdir -p /data/zookeeper
    $ vim /opt/zookeeper/conf/zoo.cfg 
    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/data/zookeeper
    clientPort=2181
    server.1=192.168.1.1:2888:3888
    server.2=192.168.1.2:2888:3888
    server.3=192.168.1.3:2888:3888
    $ echo "1" >> /data/zookeeper/myid
    #将kafka相关的文件目录远程传输到另外两台
    $ rsync -avz /opt/zookeeper* kafka02:/opt/
    $ rsync -avz /data/* kafka02:/data
    $ rsync -avz /opt/zookeeper* kafka03:/opt/
    $ rsync -avz /data/* kafka03:/data
    #启动
    $ /opt/zookeeper/bin/zkServer.sh start
    $ /opt/zookeeper/bin/zkServer.sh status
    $ /opt/zookeeper/bin/zkServer.sh start
    #kafka02和kafka03更改myid并启动
    $ echo "2" > /data/zookeeper/myid 
    $ /opt/zookeeper/bin/zkServer.sh start
    $ echo "3" > /data/zookeeper/myid
    $ /opt/zookeeper/bin/zkServer.sh start
    #查看各个节点的状态
    $ /opt/zookeeper/bin/zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper/bin/../conf/zoo.cfg
    Mode: follower
    $ /opt/zookeeper/bin/zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper/bin/../conf/zoo.cfg
    Mode: leader
    $ /opt/zookeeper/bin/zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper/bin/../conf/zoo.cfg
    Mode: follower
    #保证三个节点有一个leader
    #测试
    $ /opt/zookeeper/bin/zkCli.sh -server kafka01:2181
    [zk: kafka01:2181(CONNECTED) 0] create /test "hello"
    #插入数据
    $ /opt/zookeeper/bin/zkCli.sh -server kafka02:2181
    [zk: kafka02:2181(CONNECTED) 0] get /test
    #获取数据
    $ /opt/zookeeper/bin/zkCli.sh -server kafka03:2181
    [zk: kafka03:2181(CONNECTED) 0] get /test
    #获取数据
    

    2.2 安装kafka

    $ wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.1/kafka_2.11-2.4.1.tgz
    $ tar zxf kafka_2.11-2.4.1.tgz -C /opt
    $ ln -s /opt/kafka_2.11-2.4.1/ /opt/kafka
    $ mkdir /opt/kafka/logs
    $ vim /opt/kafka/config/server.properties
    broker.id=1
    listeners=PLAINTEXT://192.168.1.1:9092
    log.dirs=/opt/kafka/logs
    log.retention.hours=24
    zookeeper.connect=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
    
    $ rsync -avz /opt/kafka* kafka02:/opt/
    $ rsync -avz /opt/kafka* kafka03:/opt/
    $ /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties 
    #最后一行出现KafkaServer id和started则表示启动成功,就可放后台启动
    $ /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
    $ vim /opt/kafka/config/server.properties
    broker.id=2
    listeners=PLAINTEXT://192.168.1.2:9092
    log.dirs=/opt/kafka/logs
    log.retention.hours=24
    zookeeper.connect=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
    $ /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
    $ /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
    $ vim /opt/kafka/config/server.properties
    broker.id=3
    listeners=PLAINTEXT://192.168.1.3:9092
    log.dirs=/opt/kafka/logs
    log.retention.hours=24
    zookeeper.connect=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
    $ /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
    $ /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
    #测试
    $ /opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 --partitions 3 --replication-factor 3 --topic messagetest
    $ /opt/kafka/bin/kafka-console-producer.sh --broker-list 192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092 --topic messagetest
    #进入交互模式随便输入信息
    $ /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server  192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092 --topic messagetest --from-beginning
    $ /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server  192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092 --topic messagetest --from-beginning
    #查看是否可以获取到信息
    

    2.3 部署ES

    $ wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.6.0.rpm
    $ yum -y install elasticsearch-6.6.0.rpm
    $ egrep -v '#|^$' /etc/elasticsearch/elasticsearch.yml 
    kafka01.name: kafka01
    path.data: /elk/data
    path.logs: /elk/log
    network.host: 192.168.1.1
    http.port: 9200
    $ mkdir -p /elk/{data,log}
    $ chown elasticsearch.elasticsearch /elk -R
    $ systemctl start elasticsearch
    $ ss -lnt | grep 9200
    LISTEN     0      128     ::ffff:192.168.1.1:9200                    :::*    
    

    2.4 部署kibana

    $ wget https://artifacts.elastic.co/downloads/kibana/kibana-6.6.0-x86_64.rpm
    $ yum -y install kibana-6.6.0-x86_64.rpm
    $ egrep -v '#|^$' /etc/kibana/kibana.yml 
    server.port: 5601
    server.host: "192.168.1.1"
    server.name: "kafka01"
    elasticsearch.hosts: ["http://192.168.1.1:9200"]
    kibana.index: ".kibana"
    $ systemctl start kibana
    $ ss -lnt | grep 5601
    LISTEN     0      128    192.168.1.1:5601                     *:*         
    

    访问页面:
    20200328175433

    2.5 部署nginx、filebeat

    $ vim /etc/yum.repos.d/nginx.repo
    [nginx-stable]
    name=nginx stable repo
    baseurl=http://nginx.org/packages/centos/$releasever/$basearch/
    gpgcheck=0
    enabled=1
    $ yum -y install nginx httpd-tools
    $ vim /etc/nginx/nginx.conf
    #添加以下内容将其日志格式转换为json格式
        log_format json '{ "@time_local": "$time_local", '
                            '"remote_addr": "$remote_addr", '
                            '"referer": "$http_referer", '
                            '"request": "$request", '
                            '"status": $status, '
                            '"bytes": $body_bytes_sent, '
                            '"agent": "$http_user_agent", '
                            '"x_forwarded": "$http_x_forwarded_for", '
                            '"up_addr": "$upstream_addr",'
                            '"up_host": "$upstream_http_host",'
                            '"up_resp_time": "$upstream_response_time",'
                            '"request_time": "$request_time"'
    ' }';  
    
        access_log  /var/log/nginx/access.log  json;
    $ nginx	
    $ wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.6.0-x86_64.rpm
    $ yum -y install filebeat-6.6.0-x86_64.rpm 
    $ vim /etc/filebeat/filebeat.yml 
    filebeat.inputs:
    - type: log
      enabled: true
      paths:
        - /var/log/nginx/access.log
      json.keys_under_root: true
      json.overwrite_keys: true
      tags: ["access"]
    
    - type: log
      enabled: true
      paths:
        - /var/log/nginx/error.log
      tags: ["error"]
    
    output.kafka:
      hosts: ["192.168.1.1:9092","192.168.1.2:9092","192.168.1.3:9092"]
      topic: elklog
    $ systemctl start filebeat
    $ ab -c 100 -n 100 http://192.168.1.3/
    $ ab -c 100 -n 100 http://192.168.1.3/error
    

    2.6 部署logstash

    $ wget https://artifacts.elastic.co/downloads/logstash/logstash-6.6.0.rpm
    $ yum -y install logstash-6.6.0.rpm
    $ vim /etc/logstash/conf.d/kafka.conf
    #名称可以自定义,保证是在这个路径下
    input{
      kafka {
        bootstrap_servers => "192.168.1.2:9092"
        topics => ["elklog"]
        group_id => "logstash"
        codec => "json"
      }
    }
    
    filter {
      mutate {
        convert => ["upstream_time","float"]
        convert => ["request_time","float"]
      }
    }
    
    output {
      if "access" in [tags] {
        elasticsearch {
          hosts => "http://192.168.1.1:9200"
          manage_template => false
          index => "nginx_access-%{+yyyy.MM}"
            }
      }
      if "error" in [tags] {
        elasticsearch {
          hosts => "http://192.168.1.1:9200"
          manage_template => false
          index => "nginx_error-%{+yyyy.MM}"
            }
      }
    }
    $ /usr/share/logstash/bin/logstash  -f /etc/logstash/conf.d/kafka.conf
    

    如图:
    20200328183757
    由于不是一次成功的,所以图片中日志的条目可能有点不符!

    自行添加索引,结果如图:
    20200328184106

    *************** 当你发现自己的才华撑不起野心时,就请安静下来学习吧!***************
  • 相关阅读:
    二分图之最小边覆盖(poj3020)
    第一章:计算机网络概述
    X Window 简单的新手教程
    SharePoint Permission Analyzer 权限分析仪
    《源创新》:破坏性创新换了个说法,有陷入锤子钉子模式的嫌疑,书中的案例可以看一看。
    《金融可以颠覆历史》:隐藏在历史事件背后的金融制度发展历程
    转发:三伏天话“三伏贴”
    《浪潮之巅》(第2版):精彩的IT商战史
    《史玉柱自述》:管理者要谦虚,好的经营策略是试出来的
    《生活中的经济学》:主张让市场去解决生活中的问题,离中国的现实有点远
  • 原文地址:https://www.cnblogs.com/lvzhenjiang/p/14199348.html
Copyright © 2011-2022 走看看