zoukankan      html  css  js  c++  java
  • ELK集群之kafka(7)

    原理待补充:

    kafka依赖于zookeeper集群。

    都是基于java 由于源码安装jdk 未声明bin下java 在各自server配置文件中声明

    JAVA_HOME=/usr/local/jdk1.8.0_241

    引入kafka缓存日志之ZK搭建

    之前架构
    Filebeat(多台)  -> Logstash(正则) -> Elasticsearch(入库) -> Kibana展现
    
    架构优化,流行的架构
    Filebeat(多台)  ->  Kafka(或Redis) ->     Logstash(正则多台) -> Elasticsearch(入库) -> Kibana展现
    
    Kafka服务器的搭建
      Kafka依赖于Zookeeper
      依赖于Java环境
    
    Kafka依赖于Zookeeper
    官方网站:https://zookeeper.apache.org/
    下载ZK的二进制包
    解压到对应目录完成安装/usr/local/zookeeper
    JAVA_HOME="/usr/local/jdk1.8.0_241"
    
    Java环境安装
      yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel
      java -version
    
    数据目录准备
      mkdir -pv /usr/local/zookeeper/data
    
    zk配置
      cd /usr/local/zookeeper/conf/
      cp zoo_sample.cfg zoo.cfg
    
    更改配置/usr/local/zookeeper/conf/zoo.cfg
    dataDir=/usr/local/zookeeper/data
    autopurge.snapRetainCount=3
    autopurge.purgeInterval=1
    
    zk使用systemctl管理/usr/lib/systemd/system/zookeeper.service
    [Unit]
    Description=zookeeper
    After=network.target
    [Service]
    Type=forking
    ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
    User=root
    [Install]
    WantedBy=multi-user.target
    
    启动zk
      systemctl enable zookeeper
      systemctl restart zookeeper
    
    启动zk集群查看状态
    ./zkServer.sh start
    ./zkServer.sh status
    zk单实例
            引入kafka缓存日志之ZK集群搭建
    集群配置/usr/local/zookeeper/conf/zoo.cfg
    server.1=192.168.238.90:2888:3888
    server.2=192.168.238.92:2888:3888
    server.3=192.168.238.94:2888:3888
    
    更改zk集群的id
      /usr/local/zookeeper/data/myid
      分别为1 2 3 
    
    zk使用systemctl管理/usr/lib/systemd/system/zookeeper.service
    [Unit]
    Description=zookeeper
    After=network.target
    [Service]
    Type=forking
    ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
    User=root
    [Install]
    WantedBy=multi-user.target
    
    启动zk
      systemctl enable zookeeper
      systemctl restart zookeeper
    
    启动zk集群查看状态
    ./zkServer.sh start
    ./zkServer.sh status
    
    验证zk集群,创建一个节点,验证
    ./zkCli.sh
    create /sjg
    create /sjg/sjg
    zk集群部署

    引入kafka缓存日志之Kafka集群搭建

    Kafka下载地址
    Kafka官网:http://kafka.apache.org/
    下载Kafka的二进制包
    解压到对应目录完成安装
    
    Kafka下载
      http://kafka.apache.org/downloads
    
    修改Kafka配置server.properties
     =0
    listeners=PLAINTEXT://xxx:9092
    log.retention.hours=1
    zookeeper.connect=192.168.238.90:2181,192.168.238.92:2181,192.168.238.94:2181
    
    Jvm内存修改/usr/local/kafka_2.12-2.5.0/bin/kafka-server-start.sh
    KAFKA_HEAP_OPTS
    
    Kafka使用systemctl管理/usr/lib/systemd/system/kafka.service
    [Unit]
    Description=kafka
    After=network.target
    [Service]
    Type=simple
    ExecStart=/usr/local/kafka_2.12-2.5.0/bin/kafka-server-start.sh /usr/local/kafka_2.12-2.5.0/config/server.properties
    User=root
    [Install]
    WantedBy=multi-user.target
    
    创建topic
    /usr/local/kafka_2.12-2.5.0/bin/kafka-topics.sh --create --zookeeper 192.168.238.90:2181 --replication-factor 2 --partitions 1 --topic sjg
    /usr/local/kafka_2.12-2.5.0/bin/kafka-topics.sh --describe --zookeeper 192.168.238.90:2181 --topic sjg
    
            Filebeat和Logstash间引入Kafka集群
    架构演进
    filebeat -> logstsash -> es
    filebeat -> kafka(集群) -> logstash(多台) -> es
    
    Logstash读取Kafka
    input {
      kafka {
        bootstrap_servers => "192.168.238.90:9092,192.168.238.92:9092"
        topics => ["sjg"]
        group_id => "sjggroup"
        codec => "json"
      }
    }
    
    Filebeat日志发送到Kafka
    filebeat.inputs:
    - type: log
      tail_files: true
      backoff: "1s"
      paths:
          - /var/log/nginx/access.json.log
    processors:
    - drop_fields:
        fields: ["agent","ecs","log","input"]
    output:
      kafka:
        hosts: ["192.168.238.90:9092", "192.168.238.92:9092"]
        topic: sjg
    
    Kafka查看队列信息
    查看Group: ./kafka-consumer-groups.sh --bootstrap-server 172.17.166.217:9092 --list
    查看队列:./kafka-consumer-groups.sh --bootstrap-server 172.17.166.217:9092 --group test2 --describe
    kafka集群搭建测试

    Filebeat和Logstash间引入Kafka集群多日志分析

    Filebeat配置
    filebeat.inputs:
    - type: log
      tail_files: true
      backoff: "1s"
      paths:
          - /var/log/nginx/access.log
      fields:
        type: access
      fields_under_root: true
    
    - type: log
      tail_files: true
      backoff: "1s"
      paths:
          - /var/log/secure
      fields:
        type: system
      fields_under_root: true
    processors:
    - drop_fields:
        fields: ["agent","ecs","log","input"]
    output:
      kafka:
        hosts: ["192.168.238.90:9092", "192.168.238.92:9092"]
        topic: sjg
    
    Logstash配置
    input {
      kafka {
        bootstrap_servers => "192.168.238.90:9092,192.168.238.92:9092"
        topics => ["sjg"]
        group_id => "sjggroup"
        codec => "json"
      }
    }
    filter {
      if [type] == "access" {
        grok {
          match => {
            "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) [%{HTTPDATE:time_local}] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
          }
          remove_field => ["message"]
        }
        date {
          match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"]
          target => "@timestamp"
        }
      }
      else if [type] == "system" {
      }
    }
    output {
      if [type] == "access" {
        elasticsearch {
          hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
          user => "elastic"
          password => "sjgpwd"
          index => "sjgaccess-%{+YYYY.MM.dd}"
        }
      }
      else if [type] == "system" {
        elasticsearch {
          hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
          user => "elastic"
          password => "sjgpwd"
          index => "sjgsystem-%{+YYYY.MM.dd}"
        }
      }
    }
    filebeat+kafka+logstash
    建两个索引来说明用户管理
      可设置某个索引只读
    
    观察elastic角色
      super角色
    
    创建角色
      给予某个索引的读权限read-sjgaccess,给某个索引只读权限read
      创建用户sjg, 给kibana_user、read-sjgaccess 角色
    
    删除索引测试
      删除索引
      给all权限就能删除索引
    kibana用户管理
  • 相关阅读:
    PhoneGap打包webApp
    mysql触发器实例说明
    mysql索引总结
    python:生成器
    python:装饰器
    python:局部变量与全局变量
    python:函数
    python:文件操作
    python:集合及其运算
    python:字符串常用函数
  • 原文地址:https://www.cnblogs.com/dahuige/p/15067497.html
Copyright © 2011-2022 走看看