准备三台centos7的服务器 两核两G的
关闭防火墙和SELinux
systemctl stop firewalld
setenforce 0
rpm -ivh jdk-8u131-linux-x64_.rpm 准备中... ################################# [100%] 正在升级/安装... 1:jdk1.8.0_131-2000:1.8.0_131-fcs ################################# [100%] Unpacking JAR files... tools.jar... plugin.jar... javaws.jar... deploy.jar... rt.jar... jsse.jar... charsets.jar... localedata.jar... java -version //执行这条命令可以看到jdk的版本 就是jdk环境配置成功 java version "1.8.0_131" Java(TM) SE Runtime Environment (build 1.8.0_131-b11) Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)
2. 因为我装的是无图形化界面的centos 每一台虚拟机都需要
yum -y install wget cd /usr/local/src ##这个下载的是zookeeper wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz ##这个下载的是kafka wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz
3. 解压zookeeper 然后移动到/usr/local/zookeeper kafka做同样的操作 kafka移动到/usr/local/kafka
tar -zxf zookeeper-3.4.14.tar.gz mv zookeeper-3.4.14 /usr/local/zookeeper tar -zxf kafka_2.11-2.2.0.tgz mv kafka_2.11-2.2.0 /usr/local/kafka
4.在 /usr/local/zookeeper创建两个目录 zkdatalog zkdata
cd /usr/local/zookeeper
mkdir {zkdatalog,zkdata}
5.进入/usr/loca/zookeeper/conf 复制一个配置文件 修改复制出来的配置文件
cd /usr/local/zookeeper/conf cp zoo_sample.cfg zoo.cfg vim zoo.cfg ******************************* tickTime=2000 initLimit=10 syncLimit=5 dataDir=/usr/local/zookeeper/zkdata datalogDir=/usr/local/zookeeper/zkdatalog clientPort=2181 server.1=192.168.18.140:2888:3888 server.2=192.168.18.141:2888:3888 server.3=192.168.18.142:2888:3888 *******************************
6. 每一台的操作都不一样
第一台 echo '1' > /usr/local/zookeeper/zkdata/myid 第二台 echo '2' > /usr/local/zookeeper/zkdata/myid 第三胎 echo '3' > /usr/local/zookeeper/zkdata/myid
7.启动服务 启动的时候按顺序开启 第一台 第二台 第三台
cd /usr/local/zookeeper/bin ./zkServer.sh start ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED
8.查看是否开启 zookpeer 的状态
第一台 显示这个状态表示成功 ./zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Mode: follower 第二台 ./zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Mode: leader 第三台 ./zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Mode: follower
搭建kafka
第一台 cd /usr/local/kafka/config vim server.properties ********************* broker.id=1 advertised.listeners=PLAINTEXT://kafka01:9092 zookeeper.connect=192.168.18.140:2181,192.168.18.141:2181,192.168.18.142:2181 (三台服务器的IP地址) 第2台 cd /usr/local/kafka/config vim server.properties ********************* broker.id=2 advertised.listeners=PLAINTEXT://kafka02:9092 zookeeper.connect=192.168.18.140:2181,192.168.18.141:2181,192.168.18.142:2181 (三台服务器的IP地址) 第3台 cd /usr/local/kafka/config vim server.properties ********************* broker.id=3 advertised.listeners=PLAINTEXT://kafka03:9092 zookeeper.connect=192.168.18.140:2181,192.168.18.141:2181,192.168.18.142:2181 (三台服务器的IP地址)
10.vim /etc/hosts
添加以下内容在每一台服务器上 192.168.18.140 kafka01 192.168.18.141 kafka02 192.168.18.142 kafka03
11.启动kafka的命令 每一台都启动kafka
cd /usr/local/kafka/bin
./kafka-server-start.sh -daemon ../config/server.properties
启动之后可以查看端口9092
ss -ntlp | grep 9092
12.创建主题 topics
cd /usr/local/kafka/bin ./kafka-topics.sh --create --zookeeper 192.168.18.140:2181(这个ip地址随便写三台服务器中的一个就可以)--replication-factor 2 --partitions 3 --topic wg007
查看主题 如果可以显示刚刚创建的就是成功了
cd /usr/local/kafka/bin ./kafka-topics.sh --list --zookeeper 192.168.18.140:2181
模拟生产者 执行代码后就会有一个小的 >
cd /usr/local/kafka/bin ./kafka-console-producer.sh --broker-list 192.168.18.140:9092 --topic wg007 >
在第二台服务器上模拟消费者
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.18.141:9092 --topic wg007 --from-beginning
测试模拟生产者和消费者是否成功
在模拟生产者的服务器上写一些东西 可以在模拟消费者的服务器上可以看到表示成功如下所示
写一个脚本用来创建kafka的topic
cd /usr/local/kafka/bin vim kafka-create-topics.sh ################################# #!/bin/bash read -p "请输入一个你想要创建的topic:" topic cd /usr/local/kafka/bin ./kafka-topics.sh --create --zookeeper 192.168.18.140:2181 --replication-factor 2 --partitions 3 --topic ${topic}
创建一个新的yum源
vim /etc/yum.repo.d/filebeat.repo ***加入以下内容 [filebeat-6.x] name=Elasticsearch repository for 6.x packages baseurl=https://artifacts.elastic.co/packages/6.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
安装
yum -y install filebeat
编辑filebeat的配置文件
cd /etc/filebeat vim filebeat.yml 修改以下内容 enabled: true paths: - /var/log/nginx/*.log //写这个路径的前提是安装nginx output.kafka: enabled: true # Array of hosts to connect to. hosts: ["192.168.18.140:9092","192.168.18.141:9092","192.168.18.142:9092"] topic: nginx5 //写这个nginx_log1 的前提是有nginx_log1的topic 上面有生产者的脚本
安装nginx
yum -y install epel*
yum -y install nginx
启动nginx和filebeat
systemctl start filebeat
systemctl enable filebeat
systemctl start nginx
可以给ningx生产一些数据
yum -y install httpd-tools ab -n1000 -c 200 http://127.0.0.1/cccc //这条命令可以多执行几次
可以在安装filebeat的服务器上测试一下nginx的服务
curl -I 192.168.18.140:80
在模拟消费者的服务器上 如果可以显示一下内容 表示成功了
cd /usr/local/kafka/bin ./kafka-console-consumer.sh --bootstrap-server 192.168.18.141:9092 --topic nginx5(这里的topic的名字一定要和filebeat的配置文件里的一致) --from-beginning
如图所示
现在开始收集多个日志 system nginx secure 和日志 编辑filebeat的配置文件
#讲filebeat的input改成下面的样子 filebeat.inputs: #这个是收集nginx的日志 - type: log enabled: true paths: - /var/log/nginx/*.log //nginx的日志文件 fields: log_topics: nginx5 //这个是收集nginx的topic #这个是收集system的日志 - type: log enabled: true paths: - /var/log/messages //system的日志文件目录 fields: log_topics: messages //这个是收集system的topic #收集secure的日志 - type: log enabled: true paths: - /var/log/secure //secure的日志文件 fields: log_topics: secure //这个是收集secure的topic output.kafka: enabled: true hosts: ["192.168.18.140:9092","192.168.18.141:9092","192.168.18.142:9092"] topic: '%{[fields][log_topics]}'
注意:一点更要创建三个topic 就是上面的配置文件提到的topic 可以使用上面的脚本创建topic 重启filebeat
systemctl restart filebeat
我是用了三台服务器来做EFK集群
接下来在第二胎安装logstash 在第三胎安装ES集群(就是elasticsearch和kibana)
安装
安装logstash在第二台 rpm -ivh logstash-6.6.0.rpm 安装 kibana 和elasticsearch rpm -ivh elasticsearch-6.6.2.rpm rpm -ivh kibana-6.6.2-x86_64.rpm
编辑elasticsearch的配置文件
vim /etc/elasticsearch/elasticsearch.yml ############################ network.host: 192.168.18.142 http.port: 9200
启动elasticsearch
systemctl restart elasticsearch
编辑kibana的配置文件
vim /etc/kibana/kibana.yml ###################### server.port: 5601 server.host: "0.0.0.0" elasticsearch.hosts: ["http://192.168.18.142:9200"] 然后启动kibana systemctl restart kibana
现在开始 编写logstash的三个配置文件
cd /etc/logstash/conf.d
现在是messages的 vim messages.conf
input { kafka { bootstrap_servers => ["192.168.18.140:9092,192.168.18.141:9092,192.168.18.142:9092"] group_id => "logstash" topics => "messages" consumer_threads => 5 } } output { elasticsearch { hosts => "192.168.18.142:9200" index => "messages-%{+YYYY.MM.dd}" } }
现在是nginx的 vim nginx.conf
input { kafka { bootstrap_servers => ["192.168.18.140:9092,192.168.18.141:9092,192.168.18.142:9092"] group_id => "logstash" topics => "nginx5" consumer_threads => 5 } } filter { grok { match => { "message" => "%{NGINXACCESS}" } } } output { elasticsearch { hosts => "192.168.18.142:9200" index => "nginx1_log-%{+YYYY.MM.dd}" } }
现在是secure的 vim secure.conf
input { kafka { bootstrap_servers => ["192.168.18.140:9092,192.168.18.141:9092,192.168.18.142:9092"] group_id => "logstash" topics => "secure" consumer_threads => 5 } } output { elasticsearch { hosts => "192.168.18.142:9200" index => "secure-%{+YYYY.MM.dd}" } }
添加管道
vim /etc/logstash/pipelines.yml - pipeline.id: messages path.config: "/etc/logstash/conf.d/messages.conf" - pipeline.id: nginx path.config: "/etc/logstash/conf.d/nginx.conf" - pipeline.id: secure path.config: "/etc/logstash/conf.d/secure.conf"
正则匹配
cd /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns vim nginx_access URIPARAM1 [A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?-[]]* NGINXACCESS %{IPORHOST:client_ip} (%{USER:ident}|- ) (%{USER:auth}|-) [%{HTTPDATE:timestamp}] "(?:%{WORD:verb} (%{NOTSPACE:request}|-)(?: HTTP/%{NUMBER:http_version})?|-)" %{NUMBER:status} (?:%{NUMBER:bytes}|-) "(?:%{URI:referrer}|-)" "%{GREEDYDATA:agent}"
重启logstash systemctl restart logstash
可以在第二台机器上查看模拟消费者的状态 messages的
执行下面的命令可以显示出日志内容就是成功
cd /usr/local/kafak/bin ./kafka-console-consumer.sh --bootstrap-server 192.168.18.141:9092 --topic messages --from-beginning<br><br>
可以在第二台机器上查看模拟消费者的状态 secure的
执行下面的命令可以显示出日志内容就是成功
cd /usr/local/kafak/bin ./kafka-console-consumer.sh --bootstrap-server 192.168.18.141:9092 --topic secure --from-beginning
可以在第二台机器上查看模拟消费者的状态 nginx的 nginx的可以生产一些日志文件 创建一些访问记录
执行下面的命令可以显示出日志内容就是成功
cd /usr/local/kafak/bin ./kafka-console-consumer.sh --bootstrap-server 192.168.18.141:9092 --topic nginx5 --from-beginning