下载KAFKA
http://kafka.apache.org/downloads
解压并修改配置文件
- vim config/server.properties
#broker的全局唯一编号,不能重复
broker.id=0
#用来监听链接的端口,producer或consumer将在此端口建立链接
port=9092
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#日志存放路径
log.dirs=/home/hadoop/logs/kafka
#topic在当前broker上的分片数量
num.partitions=2
#恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间(超时将被删除)
log.retention.hours=168
#滚动生成新segment文件的最大时间
log.roll.hours=168
分发文件,修改配置文件
- 依次修改各服务器上配置文件的的broker.id,不可重复
启动KAFKA
./bin/kafka-server-start.sh config/server.properties
常用操作命令
- 查看当前服务器中的所有topic
./kafka-topics.sh --list --zookeeper master:2181
- 创建topic
./kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 3 --topic kafka
- 删除topic
./kafka-topics.sh --delete --zookeeper master:2181 --topic kafka
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
- 查看消费位置
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper master:2181 --group testGroup
- 查看某个topic详情
./kafka-topics.sh --topic kafka --describe --zookeeper master:2181
单机连通性测试/分布式连通性测试
- 运行producer:
./kafka-console-producer.sh --broker-list [host-name]:9092 --topic kafka
- 运行consumer:
./kafka-console-consumer.sh --zookeeper [host-name]:2181 --topic kafka --from-beginning
- 在producer端输入字符串并回车,查看consumer端是否显示。
- 单机测试host-name为localhost,分布式测试host-name为当前机器name
异常
- 下错kafka文件。如果下载的是src文件会报该异常
[hadoop@slave2 kafka-0.10.2.0-src]$ ./bin/kafka-server-start.sh config/server.properties
Error: Could not find or load main class config.server.properties
- ERROR Producer connection to slave2:9092 unsuccessful
程序中代码为:
props.put("metadata.broker.list", "47.92.67.x:9092,47.92.37.x:9092,47.92.37.x:9092");
问题出在本地机器hosts文件中没添加映射