环境
- CentOS7
- kafka2.5.0
准备
先搭建zookeeper,参考:https://www.cnblogs.com/caroar/p/13172921.html
下载&配置
mkdir /kafka
cd /kafka
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -zxvf kafka_2.12-2.5.0.tgz
mv kafka_2.12-2.5.0 kafka1
cd kafka1
vi config/server.properties
修改broker.id=1
修改listeners=PLAINTEXT://127.0.0.1:9091 #这个配置默认是注释的,默认9092,如果在多台机器上搭建集群,这个可以不用动,单机搭得改,不然三个服务全都默认9092就冲突了
修改log.dirs=/kafka/kafka1/logs
修改zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
cd /kafka
cp -r kafka1 kafka2
cp -r kafka1 kafka 3
vi kafka2/config/server.properties
修改broker.id=2
修改listeners=PLAINTEXT://127.0.0.1:9092
修改log.dirs=/kafka/kafka2/logs
vi kafka3/config/server.properties
修改broker.id=3
修改listeners=PLAINTEXT://127.0.0.1:9093
修改log.dirs=/kafka/kafka3/logs
启动
cd /kafka/kafka1
nohup sh bin/kafka-server-start.sh config/server.properties & #这里为什么用nohup因为不然启动起来这个控制台就一直被占着了
cd ../kafka2
nohup sh bin/kafka-server-start.sh config/server.properties &
cd ../kafka3
nohup sh bin/kafka-server-start.sh config/server.properties &
创建Topic(1个分区1个备份)
cd /kafka/kafka1/bin
sh kafka-topics.sh --create --bootstrap-server 127.0.0.1:9091 --replication-factor 1 --partitions 1 --topic test
sh kafka-topics.sh --list --bootstrap-server 127.0.0.1:9091
发送消息
sh kafka-console-producer.sh --bootstrap-server 127.0.0.1:9091 --topic test
# 这里一行一行的输入消息
接收消息
sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9091 --topic test --from-beginning
创建Topic(1个分区3个备份)
sh kafka-topics.sh --create --bootstrap-server 127.0.0.1:9091 --replication-factor 3 --partitions 1 --topic my-replicated-topic
sh kafka-topics.sh --list --bootstrap-server 127.0.0.1:9091
sh kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9091 --topic my-replicated-topic # 查看当前的leader等信息
备注
官方指引:http://kafka.apache.org/quickstart
问题
注意 0.8.2.1 版本中 consumer.pull 方法里面直接返回 null 的,也是很神奇,高版本里面是有代码的。