Topic相关
一、启动
# 1 加守护进程启动
bin/kafka-server-start.sh -daemon config/server.properties
# 2 通过后台来启动
nohup bin/kafka-server-start.sh config/server.properties &
二、查看当前kafka集群中Topic名字
bin/kafka-topics.sh --list --zookeeper127.0.0.1:2181
三、查看Topic详情:分区和副本等情况
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic topicname
# 结果:Topic:topicname PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test0 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 1,0,2
Topic: test0 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2
Topic: test0 Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 1,0,2
四、创建Topic
bin/kafka-topics.sh --create --topic test0--zookeeper 127.0.0.1:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1
--topic后面的test0是topic的名称
--zookeeper应该和server.properties文件中的zookeeper.connect一样
--config指定当前topic上有效的参数值
--partitions指定topic的partition数量,如果不指定该数量,默认是server.properties文件中的num.partitions配置值
--replication-factor指定每个partition的副本个数,默认1个
五、删除Topic
bin/kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic topicname
删除kafka中该topic相关的目录
重启kafka
六、查看topic消费到的offset
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic test0 --time -1
# 运行结果
test0:0:177496
test0:1:61414
七、修改topic的partition数量(只能增加不能减少)
通过kafka-topics.sh工具的alter命令,将replicated-topic的partitions从1增加到3
bin/kafka-topics.sh --zookeeter localhost:2181 --alter --partitions 3 --topic replicated-topic
消费者指令
- 消费信息
# 从 latest 位移位置开始消费该主题的所有分区消息,即仅消费正在写入的消息。
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic topicName
- 从开始位置消费
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --from-beginning --topic topicName
- 显示key消费,打印出消息体的 key 和 value
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --property print.key=true --topic topicName