以三台为例,先安装一台,然后分发;
一、准备
1、下载
kafka_2.11-2.0.1.tgz
前面的数字2.11是scala的版本,2.0.1是kafka的版本;
2、前提
前提是已经搭建好了zookeeper集群,这里zookeeper也是三台;
二、搭建
1、解压包即可
tar zxf kafka_2.11-0.11.0.0.tgz -C /usr/local/
#也可以改个名字,用mv命令
2、配置
##编辑配置文件 vi /usr/local/kafka/config/server.properties #broker的全局唯一编号,不能重复,依次增长 broker.id=0 #默认值,往kafka里写数据用的端口 port=9092 #删除topic功能使能 delete.topic.enable=true #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘IO的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #往kafka写数据,将数据存放的位置 log.dirs=/opt/module/kafka/logs #topic在当前broker上的分区个数 num.partitions=1 #用来恢复和清理data下数据的线程数量 num.recovery.threads.per.data.dir=1 #segment文件保留的最长时间,超时将被删除 log.retention.hours=168 #配置连接Zookeeper集群地址 zookeeper.connect=192.168.1.135:2181,192.168.1.136:2181,192.168.1.137:2181 #连接zookeeper的超时时间 zookeeper.connection.timeout.ms=1000000
3、分发kafka
用scp把kafka拷贝到另外两台主机即可;
唯一区别的,就是server.properties中的broker.id,要设置为1和2
三、启动kafka
1、启动zookeeper集群
2、启动kafka集群
##因为kafka没有启动命令,只能通过脚本启动,可以自己写个脚本,将启动命令包进去
比如,在kafka的bin目录中,写个脚本:startkafka.sh ,kafka的具体路径,根据自己的定;
vim startkafka.sh 内容如下:
KAFKA_PATH="/opt/kafka_2.11-0.10.2.1"
nohup ${KAFKA_PATH}/bin/kafka-server-start.sh ${KAFKA_PATH}/config/server.properties > kafka.log 2>&1 &
##然后分发这个脚本,并赋予权限
##最后执行这个脚本启动(集群所有主机)
##jps 命令查看
3、部分命令
1、查看当前服务器中的所有topic [root@spark1 local]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper spark1:2181 --list 2、创建topic [root@spark1 local]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper spark1:2181 --create --replication-factor 3 --partitions 1 --topic first1 --topic 定义topic名 --replication-factor 定义副本数 --partitions 定义分区数 当前可用kafka的broker为3,当想创建replication-factor为4时,报错: [root@spark1 local]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper spark1:2181 --create --replication-factor 4 --partitions 1 --topic first2 3、删除topic [root@spark1 local]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper spark1:2181 --delete --topic first1 需要在server.properties中设置delete.topic.enable=true否则只是标记删除;
标记删除:
当没有设置delete.topic.enable=true时,删除topic是标记删除,需要去数据目录中删除数据文件,然后用命令删除topic,
还要去zookeeper里删除一些元数据:
[zk: localhost:2181(CONNECTED) 1] rmr /brokers/topics/topic名字
[zk: localhost:2181(CONNECTED) 1] rmr /admin/delete_topics/topic名字
4、查看某个topic的详情
[root@spark1 local]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper spark1:2181 --describe --topic first1
5、更多
[root@spark1 local]# /usr/local/kafka/bin/kafka-topics.sh --help
6、生产、消费消息
//生产者:
[root@spark1 local]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list spark1:9092 --topic first1 hallo kafka
//消费者:
[root@spark1 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper spark1:2181 --topic first1 --from-beginning hallo kafka
7、查看producer生产消息的最大位置(一周过期)
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list spark1:9092 --topic t0425 --time -1
-1表示查询topic各个分区当前最大的消息位移值(这里的位移不是consumer端的位移,而是指消息在每个分区的位置);
如果你要查询曾经生产过的最大消息数,那么只运行上面这条命令然后把各个分区的结果相加就可以了。
但如果你需要查询当前集群中该topic的消息数,那么还需要运行下面这条命令:
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list spark1:9092 --topic t0425 --time -2
-2表示去获取当前各个分区的最小位移,之后把运行第一条命令的结果与刚刚获取的位移之和相减就是集群中该topic的当前消息总数。