kafka相关的常用命令
后台启动kafka
kafka-server-start.sh /opt/app/kafka_2.11-1.1.0/config/server.properties > /dev/null 2>&1 &
关闭kafka
kafka-server-stop.sh
创建topic
topic的添加和修改使用下面命令
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
副本(replication)j控制每条消息在服务器中的备份,如果有3个副本,那么允许最多有2个节点宕机才能不丢数据,集群中推荐设置2或3个副本,才不会中断数据消费。
分区(partition)控制topic将分片成多少log。关于分区数的影响,首先每个分区必须完全安装在独立的服务器上。因此,如果你有20个分区的话(读和写的负载),那么处理完整的数据集不要超过20个服务器(不计算备份)。最后的分区数影响你的消费者的最大并行。命令行上添加的配置覆盖服务器的默认设置。
例如,创建topic带有4个分区,2个副本
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
修改删除topic
添加分区:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --partition 40
添加分区不能改变现有的数据,如果分区被使用中,这就可能扰乱消费者。如果数据通过哈希划分,那么该分区将通过添加分区进行洗牌,但kafka不以任何方式自动分配数据。
添加配置:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y
移除配置:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x=y
删除topic:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
主题删除选项默认是关闭的,设置服务器配置开启它。
delete.topic.enable=true
查看topic列表
kafka-topics.sh --list --zookeeper localhost:2181
查询集群描述
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
生产者(向topic写入数据)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
生产者(支持0.9-1.1.1版本)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties
消费者(从topic消费数据)(0.9以下旧版写法),间接通过zookeeper端口2181消费数据
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
消费者(支持0.9-1.1.1版本),直接通过kafka端口9092消费数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
消费者(支持2.0.0版本+),直接通过kafka端口9092消费数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer.properties
注:“--new-consumer”已经不能使用啦
消费者组列表查询(支持0.9-1.1.1版本)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
消费者组列表查询(支持2.0.0版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test
显示某个消费组的消费详情(支持0.9版本)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-consumer-group
可能遇到的问题
kafka消费者查询出错:https://blog.csdn.net/getyouwant/article/details/79000524
参考资料: