zoukankan      html  css  js  c++  java
  • ELK--05 使用kafka缓存方案/kibana画图

    ELK--05 使用kafka缓存方案/kibana画图

    1.ELK使用kafka作为缓存


    #============注意es和kibana需要先启动、zook和kafak页需要java环境=============#
    
    
    0.配置密钥和host解析    #解析需要三台都配置
    [root@db01 ~]# cat >/etc/hosts<<EOF
    10.0.0.51 db01
    10.0.0.52 db02
    10.0.0.53 db03
    EOF
    #生成秘钥对并分发秘钥
    [root@db01 ~]# ssh-keygen
    [root@db01 ~]# ssh-copy-id 10.0.0.52
    [root@db01 ~]# ssh-copy-id 10.0.0.53
    
    1.安装zook
    ###db01操作
    [root@db01 ~]# yum install -y rsync
    [root@db01 ~]# cd /data/soft
    [root@db01 ~]# tar zxf zookeeper-3.4.11.tar.gz -C /opt/
    [root@db01 ~]# ln -s /opt/zookeeper-3.4.11/ /opt/zookeeper
    [root@db01 ~]# mkdir -p /data/zookeeper
    [root@db01 ~]# cat >/opt/zookeeper/conf/zoo.cfg<<EOF
    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/data/zookeeper
    clientPort=2181
    server.1=10.0.0.51:2888:3888
    server.2=10.0.0.52:2888:3888
    server.3=10.0.0.53:2888:3888
    EOF
    [root@db01 ~]# echo "1" > /data/zookeeper/myid
    [root@db01 ~]# cat /data/zookeeper/myid
    1
    [root@db01 ~]# rsync -avz /opt/zookeeper* 10.0.0.52:/opt/
    [root@db01 ~]# rsync -avz /opt/zookeeper* 10.0.0.53:/opt/
    
    ###db02操作
    [root@db02 ~]# yum install -y rsync
    [root@db02 ~]# mkdir -p /data/zookeeper
    [root@db02 ~]# echo "2" > /data/zookeeper/myid
    [root@db02 ~]# cat /data/zookeeper/myid
    2
    
    ###db03操作
    [root@db03 ~]# yum install -y rsync
    [root@db03 ~]# mkdir -p /data/zookeeper
    [root@db03 ~]# echo "3" > /data/zookeeper/myid
    [root@db03 ~]# cat /data/zookeeper/myid
    3
    
    2.启动zookeeper(三台机器都需要启动)
    [root@db01 ~]# /opt/zookeeper/bin/zkServer.sh start
    [root@db02 ~]# /opt/zookeeper/bin/zkServer.sh start
    [root@db03 ~]# /opt/zookeeper/bin/zkServer.sh start
    
    3.检查启动是否成功(三台机器都需要启动)
    [root@db01 ~]# /opt/zookeeper/bin/zkServer.sh status
    [root@db02 ~]# /opt/zookeeper/bin/zkServer.sh status
    [root@db03 ~]# /opt/zookeeper/bin/zkServer.sh status
    
    #如果启动正常mode应该是
    2个follower
    1个leader
    
    4.测试zookeeper通讯是否正常
    在一个节点上执行,创建一个频道
    /opt/zookeeper/bin/zkCli.sh -server 10.0.0.51:2181
    create /test "hello"
    
    在其他节点上看能否接收到
    /opt/zookeeper/bin/zkCli.sh -server 10.0.0.52:2181
    get /test
    
    5.安装kafka
    ###db01操作
    [root@db01 ~]# cd /data/soft/
    [root@db01 ~]# tar zxf kafka_2.11-1.0.0.tgz -C /opt/
    [root@db01 ~]# ln -s /opt/kafka_2.11-1.0.0/ /opt/kafka
    [root@db01 ~]# mkdir /opt/kafka/logs
    [root@db01 ~]# cat >/opt/kafka/config/server.properties<<EOF
    broker.id=1
    listeners=PLAINTEXT://10.0.0.51:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/opt/kafka/logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=24
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    EOF
    [root@db01 ~]# rsync -avz /opt/kafka* 10.0.0.52:/opt/
    [root@db01 ~]# rsync -avz /opt/kafka* 10.0.0.53:/opt/
    
    
    ###db02操作
    [root@db02 ~]# sed -i "s#10.0.0.51:9092#10.0.0.52:9092#g" /opt/kafka/config/server.properties
    [root@db02 ~]# sed -i "s#broker.id=1#broker.id=2#g" /opt/kafka/config/server.properties
    
    ###db03操作
    [root@db03 ~]# sed -i "s#10.0.0.51:9092#10.0.0.53:9092#g" /opt/kafka/config/server.properties
    [root@db03 ~]# sed -i "s#broker.id=1#broker.id=3#g" /opt/kafka/config/server.properties
    
    
    6.先前台启动kafka测试 (三台机器都需要启动)
    [root@db01 ~]# /opt/kafka/bin/kafka-server-start.sh  /opt/kafka/config/server.properties
    [root@db02 ~]# /opt/kafka/bin/kafka-server-start.sh  /opt/kafka/config/server.properties
    [root@db03 ~]# /opt/kafka/bin/kafka-server-start.sh  /opt/kafka/config/server.properties
    
    7.检查是否启动 (三台机器都需要启动)
    jps
    
    8.kafka前台启动测试命令发送消息
    创建命令
    /opt/kafka/bin/kafka-topics.sh --create --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --partitions 3 --replication-factor 3 --topic  messagetest
    
    测试获取所有的频道
    /opt/kafka/bin/kafka-topics.sh  --list --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181
    
    测试发送消息
    /opt/kafka/bin/kafka-console-producer.sh --broker-list  10.0.0.51:9092,10.0.0.52:9092,10.0.0.53:9092 --topic  messagetest
    
    其他节点测试接收
    /opt/kafka/bin/kafka-console-consumer.sh --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --topic messagetest --from-beginning
    
    9.测试成功之后,可以放在后台启动  (三台都启动)
    按ctrl + c 停止kafka的前台启动,切换到后台启动
    [root@db01 ~]# /opt/kafka/bin/kafka-server-start.sh  -daemon /opt/kafka/config/server.properties
    [root@db02 ~]# /opt/kafka/bin/kafka-server-start.sh  -daemon /opt/kafka/config/server.properties
    [root@db03 ~]# /opt/kafka/bin/kafka-server-start.sh  -daemon /opt/kafka/config/server.properties
    
    10.配置filebeat
    [root@db01 ~]# cat >/etc/filebeat/filebeat.yml <<EOF
    filebeat.inputs:
    - type: log
      enabled: true 
      paths:
        - /var/log/nginx/access.log
      json.keys_under_root: true
      json.overwrite_keys: true
      tags: ["access"]
    
    - type: log
      enabled: true 
      paths:
        - /var/log/nginx/error.log
      tags: ["error"]
    
    output.kafka:
      hosts: ["10.0.0.51:9092", "10.0.0.52:9092", "10.0.0.53:9092"]
      topic: 'filebeat'
    
    setup.template.name: "nginx"
    setup.template.pattern: "nginx_*"
    setup.template.enabled: false
    setup.template.overwrite: true
    EOF
    
    重启filebeat
    [root@db01 ~]# systemctl restart filebeat 
    
    11.访问并检查kafka里有没有收到日志
    [root@db01 ~]# curl 10.0.0.51
    
    #获取filebeat的频道
    [root@db01 ~]#  /opt/kafka/bin/kafka-topics.sh  --list --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181
    
    #接收filebeat频道发来的消息
    [root@db01 ~]#  /opt/kafka/bin/kafka-console-consumer.sh --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --topic filebeat --from-beginning
    
    
    12.logstash配置文件
    [root@db01 ~]# cat > /etc/logstash/conf.d/kafka.conf<<EOF 
    input {
      kafka{
        bootstrap_servers=>["10.0.0.51:9092,10.0.0.52:9092,10.0.0.53:9092"]
        topics=>["filebeat"]
        group_id=>"logstash"
        codec => "json"
      }
    }
    
    filter {
      mutate {
        convert => ["upstream_time", "float"]
        convert => ["request_time", "float"]
      }
    }
    
    output {
       stdout {}
       if "access" in [tags] {
          elasticsearch {
            hosts => "http://10.0.0.51:9200"
            manage_template => false
            index => "nginx_access-%{+yyyy.MM}"
          }
        }
        if "error" in [tags] {
          elasticsearch {
            hosts => "http://10.0.0.51:9200"
            manage_template => false
            index => "nginx_error-%{+yyyy.MM}"
          }
        }
    }
    EOF
    
    13.前台启动logatash测试
    #先清空ES以前生成的索引
    [root@db01 ~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka.conf 
    
    生成访问日志
    [root@db01 ~]# curl 127.0.0.1
    

    测试:

    原数据:

    1.停掉db03的zookeeper

    #听到zookeeper
    [root@db03 ~]# /opt/zookeeper/bin/zkServer.sh stop
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper/bin/../conf/zoo.cfg
    Stopping zookeeper ... STOPPED
    
    #查看jps,原来3个
    [root@db03 ~]# jps
    71553 Kafka
    72851 Jps
    
    #测试生成数据====db01测试
    [root@db01 ~]# curl 127.0.0.1
    db01-www
    
    #登录es-head查看
    

    2.停掉db02的zookeeper

    #查看jps数据
    [root@db02 ~]# jps
    74467 QuorumPeerMain
    78053 Jps
    76628 Kafka
    
    #停掉db02的zookeeper
    [root@db02 ~]# /opt/zookeeper/bin/zkServer.sh stop
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper/bin/../conf/zoo.cfg
    Stopping zookeeper ... STOPPED
    
    #查看jps,剩两条
    [root@db02 ~]# jps
    78210 Jps
    76628 Kafka
    
    #测试生成数据====db01测试
    [root@db01 ~]# curl 127.0.0.1
    db01-www
    
    #登录es-head查看
    

    3.停掉db01的kafa

    #查看jps数据
    [root@db01 ~]# jps
    76902 Kafka
    48472 Logstash
    78089 Logstash
    79034 Jps
    74509 QuorumPeerMain
    
    #停掉db01的kafa
    [root@db01 ~]# /opt/kafka/bin/kafka-server-stop.sh 
    
    #查看jps数据
    [root@db01 ~]# jps
    79251 Jps
    48472 Logstash
    78089 Logstash
    74509 QuorumPeerMain
    
    #测试生成数据====db01测试
    [root@db01 ~]# curl 127.0.0.1
    db01-www
    
    #登录es-head查看
    

    #总结kafka实验
    1.前提条件
    - kafka和zook都是基于java的,所以需要java环境
    - 这俩比较吃资源,内存得够
    
    2.安装zook注意
    - 每台机器的myid要不一样,而且要和配置文件里的id对应上
    - 启动测试,角色为leader和follower
    - 测试发送和接受消息 
    
    3.安装kafka注意
    - kafka依赖于zook,所以如果zook不正常,kafka不能工作
    - kafka配置文件里要配上zook的所有IP的列表
    - kafka配置文件里要注意,写自己的IP地址
    - kafka配置文件里要注意,自己的ID是zook里配置的myid
    - kafka启动要看日志出现started才算是成功
    
    4.测试zook和kafka
    - 一端发送消息
    - 两端能实时接收消息
    
    5.配置filebeat
    - output要配上kafka的所有的IP列表
    
    6.配置logstash
    - input要写上所有的kafka的IP列表,别忘了[]
    - 前台启动测试成功后再后台启动
    
    7.毁灭测试结果
    - 只要还有1个zook和1个kafka节点,就能正常收集日志
    

    2.kibana画图展示


  • 相关阅读:
    寒假day08
    操作系统(一)操作系统的目标和作用
    数据结构排序算法稳定性总结——写给自己看
    网络请求生命周期
    PHP 不同类型之间的松散和严格比较
    php配置可被设定范围
    laravel5.7 前后端分离开发 实现基于API请求的token认证
    laravel 自动加载 自定义的文件/辅助函数
    laravel5.7 migrate 时报错 Specified key was too long error 解决方案
    PHP 命名空间
  • 原文地址:https://www.cnblogs.com/gongjingyun123--/p/12491181.html
Copyright © 2011-2022 走看看