环境准备
1.服务器概览
ip | 操作系统 | 说明 | 安装内容 |
---|---|---|---|
192.168.0.113 | centos 7 | master节点 | jdk1.8, kafka_2.11-0.10.1.1, zookeeper-3.4.8.tar |
192.168.0.114 | centos 7 | master节点 | jdk1.8, kafka_2.11-0.10.1.1, zookeeper-3.4.8.tar |
192.168.0.115 | centos 7 | master节点 | jdk1.8, kafka_2.11-0.10.1.1, zookeeper-3.4.8.tar |
2.服务器环境安装
jdk1.8 安装
注意:三台服务均执行
#添加host 192.168.0.112 master.kafka 192.168.0.114 worker1.kafka 192.168.0.115 worker2.kafka #执行以下命令关闭防火墙 [root@node1 ~]systemctl stop firewalld && systemctl disable firewalld [root@node1 ~]setenforce 0 #将SELINUX的值改成disabled [root@node1 ~]vim /etc/selinux/config SELINUX=disabled #修改系统参数 sudo vim /etc/security/limits.conf * soft nproc 65536 * hard nproc 65536 * soft nofile 65536 * hard nofile 65536 sudo vim /etc/sysctl.conf vm.max_map_count= 262144 sudo sysctl -p #重启服务器 [root@node1 ~]reboot
3.Zookeeper集群安装
4.Kafka 安装
下载Kafka安装包:http://kafka.apache.org/downloads
下载之后解压即可。(由于Kafka是用Scala语言开发,运行在JVM上,所以要先安装JDK)
创建Kafka目录并安装:
// 解压下载好的kafka压缩包并重命名 cd /usr/local tar -zxvf /usr/local/kafka_2.11-0.10.1.1.tgz mv kafka_2.11-0.10.1.1 kafka // 修改配置文件 vi ./kafka/config/server.properties
修改配置文件: 进入到config目录
cd /opt/kafka/kafka/config/
主要关注:server.properties 这个文件即可,我们可以发现在目录下:
有很多文件,这里可以发现有Zookeeper文件,我们可以根据Kafka内带的zk集群来启动,但是建议使用独立的zk集群
修改配置文件:
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样 port=19092 #当前kafka对外提供服务的端口默认是9092 host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。 num.network.threads=3 #这个是borker进行网络处理的线程数 num.io.threads=8 #这个是borker进行I/O处理的线程数 log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个 socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能 socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘 socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小 num.partitions=1 #默认的分区数,一个topic默认1个分区数 log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天 message.max.byte=5242880 #消息保存的最大值5M default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务 replica.fetch.max.bytes=5242880 #取消息的最大直接数 log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件 log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除 log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能 zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口
#Server Basics broker.id=1 每台服务器的broker.id都不能相同 #Socket Server Settings listeners=PLAINTEXT://192.168.0.113:9092 advertised.listeners=PLAINTEXT://192.168.0.113:9092 #hostname host.name=192.168.0.113 #Log Basics log.dirs=/usr/local/kafka/logs
num.partitions=3 #在log.retention.hours=168 下面新增下面三项 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 #Zookeeper zookeeper.connect=192.168.0.113:2181,192.168.0.114:2181,192.168.0.115:2181
启动Kafka,进入kafka目录运行下面的命令,有两种方式
# 第一种方式(推荐) bin/kafka-server-start.sh -daemon config/server.properties # 第二种方式 bin/kafka-server-start.sh -daemon config/server.properties # 停止 bin/kafka-server-stop.sh
常用命令
#启动 bin/kafka-server-start.sh config/server.properties & #创建 topic bin/kafka-topics.sh --create --zookeeper 192.168.0.113:2181 --replication-factor 2 --partitions 3 --topic jeremytopic #显示 topic 信息 bin/kafka-topics.sh --describe --zookeeper 192.168.0.113:2181 --topic test #列出topic bin/kafka-topics.sh --list --zookeeper 192.168.0.113:2181 #删除 topic bin/kafka-topics.sh --delete --zookeeper 192.168.0.113:2181 --topic test #关闭服务 bin/kafka-server-stop.sh
查看kafka的group.id
kafka/config目录下的consumer.properties中可以看到
注意事项:
1、确保/etc/profile中环境配置是正确的,并在更新后启用(source /etc/profile)
2、开启kafka前必须先开启所有zookeeper
3、server.properties中broker.id不能重复
4、server.properties中listeners参数后面的ip地址最好写上
5、若server.properties中listeners,zookeeper.connect参数后填写的是ip地址(192.168.131.130等),则在运行命令时也应该使用ip地址代替localhost(如/usr/local/kafka/bin/kafka-console-producer.sh –broker-list 192.168.131.130:9092 –topic my-replicated-topic避免写为/usr/local/kafka/bin/kafka-console-producer.sh –broker-list localhost:9092 –topic my-replicated-topic)
6、出现内存不足的错误时,尝试关闭多余的程序,还是不行就重启了。
参考:
https://my.oschina.net/orrin/blog/1834218
https://blog.51cto.com/littledevil/2134694?source=dra
https://blog.csdn.net/a807557328/article/details/78272989
https://www.cnblogs.com/freeweb/p/7380492.html
https://www.cnblogs.com/luotianshuai/p/5206662.html