一、相关基础内容
然后正常kafka的指令是 : ./bin/kafka-topics.sh --zookeeper hadoop300:2181 .......
但是使用CDH安装的kafka则不需要全写出此 ./bin/kafka-topics.sh 部分。只许直接写 kafka-topics 即可,这是很重要的一个区别,使用CDH安装的kafka时候要特别注意一下。
具体有哪些指令可以看此路径下:
/opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/bin
二、topic主题使用
所以我们使用topic的指令格式应该都类似:
kafka-topics --zookeeper hadoop300:2181/kafka ......
- 创建一个名为test 的主题(Topic):
kafka-topics --zookeeper hadoop300:2181/kafka --create -replication-factor 1 --partitions 3 --topic test
- 若是上述中的 ZooKeeper Root 的Kafka服务范围为: " / "。则这里的创建主题指令改为:
kafka-topics --zookeeper hadoop300:2181--create -replication-factor 1 --partitions 3 --topic test
- 删除创建的topic
kafka-topics --zookeeper hadoop300:2181/kafka --delete --topic test
这里如果直接删除,则会输出 Topic *** is marked for deletion
如果我们topic中消息堆积的太多,或者kafka所在磁盘空间满了等等,则会需要彻底清理一下kafka topic。
方法一:修改kafaka配置文件server.properties
, 添加 delete.topic.enable=true
,重启kafka,之后通过kafka命令行就可以直接删除topic。
方法二:通过命令行删除topic: ./bin/kafka-topics.sh --delete --zookeeper {zookeeper server} --topic {topic name}
因为kafaka配置文件中server.properties没有配delete.topic.enable=true
,此时的删除并不是真正的删除,只是把topic标记为:marked for deletion
你可以通过命令:./bin/kafka-topics --zookeeper {zookeeper server} --list
来查看所有topic
方法三:若需要真正删除它
需要登录zookeeper客户端:zookeeper-client
找到topic所在的目录:ls /kafka/brokers/topics
执行命令,即可,此时topic被彻底删除:rmr /kafka/brokers/topics/{topic name}
- 修改topic的分区数
kafka-topics --zookeeper hadoop300:2181/kafka --alter --topic test partitions 5
- 接着启动消费者
kafka-console-consumer --bootstrap-server hadoop300:9092 --topic test --from-beginning
- 查看消费数据后的偏移量 kafka-run-class
(1)查看每个Partition的最新偏移量 kafka-run-class kafka.tools.GetOffsetShell --broker-list hadoop:9092 --topic yourTopic --time -1 (2)查看每个Partition的最早的偏移量 kafka-run-class kafka.tools.GetOffsetShell --broker-list hadoop:9092 --topic yourTopic --time -2 (3)查看consumer组内消费的offset kafka-run-class kafka.tools.ConsumerOffsetChecker --zookeeper hadoop:2181 --topic yourTopic
- 获取topic消费组的偏移量
kafka-consumer-offset-checker --zookeeper=hadoop300:2181 --topic=mytopic --group=my_consumer_group