原理待补充:
kafka依赖于zookeeper集群。
都是基于java 由于源码安装jdk 未声明bin下java 在各自server配置文件中声明
JAVA_HOME=/usr/local/jdk1.8.0_241
引入kafka缓存日志之ZK搭建
之前架构 Filebeat(多台) -> Logstash(正则) -> Elasticsearch(入库) -> Kibana展现 架构优化,流行的架构 Filebeat(多台) -> Kafka(或Redis) -> Logstash(正则多台) -> Elasticsearch(入库) -> Kibana展现 Kafka服务器的搭建 Kafka依赖于Zookeeper 依赖于Java环境 Kafka依赖于Zookeeper 官方网站:https://zookeeper.apache.org/ 下载ZK的二进制包 解压到对应目录完成安装/usr/local/zookeeper JAVA_HOME="/usr/local/jdk1.8.0_241" Java环境安装 yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel java -version 数据目录准备 mkdir -pv /usr/local/zookeeper/data zk配置 cd /usr/local/zookeeper/conf/ cp zoo_sample.cfg zoo.cfg 更改配置/usr/local/zookeeper/conf/zoo.cfg dataDir=/usr/local/zookeeper/data autopurge.snapRetainCount=3 autopurge.purgeInterval=1 zk使用systemctl管理/usr/lib/systemd/system/zookeeper.service [Unit] Description=zookeeper After=network.target [Service] Type=forking ExecStart=/usr/local/zookeeper/bin/zkServer.sh start User=root [Install] WantedBy=multi-user.target 启动zk systemctl enable zookeeper systemctl restart zookeeper 启动zk集群查看状态 ./zkServer.sh start ./zkServer.sh status
引入kafka缓存日志之ZK集群搭建 集群配置/usr/local/zookeeper/conf/zoo.cfg server.1=192.168.238.90:2888:3888 server.2=192.168.238.92:2888:3888 server.3=192.168.238.94:2888:3888 更改zk集群的id /usr/local/zookeeper/data/myid 分别为1 2 3 zk使用systemctl管理/usr/lib/systemd/system/zookeeper.service [Unit] Description=zookeeper After=network.target [Service] Type=forking ExecStart=/usr/local/zookeeper/bin/zkServer.sh start User=root [Install] WantedBy=multi-user.target 启动zk systemctl enable zookeeper systemctl restart zookeeper 启动zk集群查看状态 ./zkServer.sh start ./zkServer.sh status 验证zk集群,创建一个节点,验证 ./zkCli.sh create /sjg create /sjg/sjg
引入kafka缓存日志之Kafka集群搭建
Kafka下载地址 Kafka官网:http://kafka.apache.org/ 下载Kafka的二进制包 解压到对应目录完成安装 Kafka下载 http://kafka.apache.org/downloads 修改Kafka配置server.properties =0 listeners=PLAINTEXT://xxx:9092 log.retention.hours=1 zookeeper.connect=192.168.238.90:2181,192.168.238.92:2181,192.168.238.94:2181 Jvm内存修改/usr/local/kafka_2.12-2.5.0/bin/kafka-server-start.sh KAFKA_HEAP_OPTS Kafka使用systemctl管理/usr/lib/systemd/system/kafka.service [Unit] Description=kafka After=network.target [Service] Type=simple ExecStart=/usr/local/kafka_2.12-2.5.0/bin/kafka-server-start.sh /usr/local/kafka_2.12-2.5.0/config/server.properties User=root [Install] WantedBy=multi-user.target 创建topic /usr/local/kafka_2.12-2.5.0/bin/kafka-topics.sh --create --zookeeper 192.168.238.90:2181 --replication-factor 2 --partitions 1 --topic sjg /usr/local/kafka_2.12-2.5.0/bin/kafka-topics.sh --describe --zookeeper 192.168.238.90:2181 --topic sjg Filebeat和Logstash间引入Kafka集群 架构演进 filebeat -> logstsash -> es filebeat -> kafka(集群) -> logstash(多台) -> es Logstash读取Kafka input { kafka { bootstrap_servers => "192.168.238.90:9092,192.168.238.92:9092" topics => ["sjg"] group_id => "sjggroup" codec => "json" } } Filebeat日志发送到Kafka filebeat.inputs: - type: log tail_files: true backoff: "1s" paths: - /var/log/nginx/access.json.log processors: - drop_fields: fields: ["agent","ecs","log","input"] output: kafka: hosts: ["192.168.238.90:9092", "192.168.238.92:9092"] topic: sjg Kafka查看队列信息 查看Group: ./kafka-consumer-groups.sh --bootstrap-server 172.17.166.217:9092 --list 查看队列:./kafka-consumer-groups.sh --bootstrap-server 172.17.166.217:9092 --group test2 --describe
Filebeat和Logstash间引入Kafka集群多日志分析
Filebeat配置 filebeat.inputs: - type: log tail_files: true backoff: "1s" paths: - /var/log/nginx/access.log fields: type: access fields_under_root: true - type: log tail_files: true backoff: "1s" paths: - /var/log/secure fields: type: system fields_under_root: true processors: - drop_fields: fields: ["agent","ecs","log","input"] output: kafka: hosts: ["192.168.238.90:9092", "192.168.238.92:9092"] topic: sjg Logstash配置 input { kafka { bootstrap_servers => "192.168.238.90:9092,192.168.238.92:9092" topics => ["sjg"] group_id => "sjggroup" codec => "json" } } filter { if [type] == "access" { grok { match => { "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) [%{HTTPDATE:time_local}] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}' } remove_field => ["message"] } date { match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } } else if [type] == "system" { } } output { if [type] == "access" { elasticsearch { hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"] user => "elastic" password => "sjgpwd" index => "sjgaccess-%{+YYYY.MM.dd}" } } else if [type] == "system" { elasticsearch { hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"] user => "elastic" password => "sjgpwd" index => "sjgsystem-%{+YYYY.MM.dd}" } } }
建两个索引来说明用户管理 可设置某个索引只读 观察elastic角色 super角色 创建角色 给予某个索引的读权限read-sjgaccess,给某个索引只读权限read 创建用户sjg, 给kibana_user、read-sjgaccess 角色 删除索引测试 删除索引 给all权限就能删除索引