1.创建topic
[root@node01 kafka]$ bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --create --replication-factor 3 --partitions 3 --topic test
# CDH版本
kafka-topics --zookeeper cm1:2181,cm2:2181,cm3:2181 --create --replication-factor 3 --partitions 3 --topic test
参数说明:
-
–replication-factor:指定每个分区的副本个数,默认1个(指定的副本数包含自身1个)
-
–partitions:指定当前创建的kafka分区数量,默认为1个
-
–topic:指定新建topic的名称)
2.查看当前服务器中的所有topic
[root@node01 kafka]$ bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --list
test
# CDH版本
kafka-topics --zookeeper cm1:2181,cm2:2181,cm3:2181 --list
3.删除topic
[root@node01 kafka]$ bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --delete --topic test
# CDH版本
kafka-topics --zookeeper cm1:2181,cm2:2181,cm3:2181 --delete --topic test
在kafka集群中删除topic,当前topic只是被标记成删除。
标记为删除还可以读取数据,只是会在一周后自动删除所有数据
要想执行删除命令后直接删除,则需要在config/server.properties配置delete.topic.enable=true
4.创建生产者发送消息
[root@node01 kafka]$ bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
>hello world
>cw cw
5.创建消费者消费消息
# kafka0.9之前,指定zookeeper
[root@node01 kafka]$ bin/kafka-console-consumer.sh
--zookeeper node01:2181,node02:2181,node03:2181 --topic test
# kafka0.9及之后,指定的是kafka集群
[root@node01 kafka]$ bin/kafka-console-consumer.sh
--bootstrap-server node01:9092,node02:9092,node03:9092 --topic test
[root@node01 kafka]$ bin/kafka-console-consumer.sh
--bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning --topic test
--bootstrap-server cm1:9092,cm2:9092,cm3:9092
# 在输入信息时,想要删除需要按ctrl+backspace
–from-beginning:会把主题中以往所有的数据都读取出来。
此时因为分为了三个分区,所以只能保证每个分区内部是有序的,分区之间无法保证有序
此时如果生产者继续添加数据,消费者将按照添加顺序显示,因为此时consumer正在开启,producer生产一个它就消费一个,所以此时顺序能保证有序
单分区有序,多分区无序
6.查看某个Topic的详情
[root@node01 kafka]$ bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --describe --topic test
Topic:test PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: test Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: test Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
# CDH版本命令
$ kafka-topics --zookeeper cm1:2181,cm2:2181,cm3:2181 --describe --topic test
-
Partition:所在分区
-
Leader:由哪个leader管理
-
Replication:副本所在位置(顺序就是优先副本的顺序)
-
Isr:检查数据的完整性
7.修改分区数
注意:topic的分区数只能增加,不能减少,因为partition中已经有数据了
[root@node01 kafka]$ bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --alter --topic test --partitions 6
8.在zookeeper上查看消费者和broker的消息
# 启动zookeeper客户端:
./zkCli.sh
# 查看topic相关信息:
[zk: localhost:2181(CONNECTED) 0] ls /brokers/topics
[test]
[zk: localhost:2181(CONNECTED) 1] ls /brokers/topics/test
[partitions]
[zk: localhost:2181(CONNECTED) 2] ls /brokers/topics/test/partitions
[0]
[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/test/partitions/0
[state]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics/test/partitions/0/state
[]
[zk: localhost:2181(CONNECTED) 5] ls
[zk: localhost:2181(CONNECTED) 6] get /brokers/topics/test/partitions/0/state
{"controller_epoch":1,"leader":1,"version":1,"leader_epoch":0,"isr":[1]}
cZxid = 0x4f00000113
ctime = Sun Jul 28 17:17:33 CST 2019
mZxid = 0x4f00000113
mtime = Sun Jul 28 17:17:33 CST 2019
pZxid = 0x4f00000113
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 72
numChildren = 0
# 查看消费者相关信息:
[zk: localhost:2181(CONNECTED) 7] ls /consumers
[console-consumer-21115]
[zk: localhost:2181(CONNECTED) 8] ls /consumers/console-consumer-21115
[ids, owners, offsets]
[zk: localhost:2181(CONNECTED) 9] ls /consumers/console-consumer-21115/offsets
[test]
[zk: localhost:2181(CONNECTED) 10] ls /consumers/console-consumer-21115/offsets/test
[0]
[zk: localhost:2181(CONNECTED) 11] ls /consumers/console-consumer-21115/offsets/test/0
[]
[zk: localhost:2181(CONNECTED) 12] get /consumers/console-consumer-21115/offsets/test/0
2
cZxid = 0x4f0000011e
ctime = Sun Jul 28 17:18:23 CST 2019
mZxid = 0x4f0000011e
mtime = Sun Jul 28 17:18:23 CST 2019
pZxid = 0x4f0000011e
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0
9.如何完全删除kafka中的数据
-
在kafka集群中删除topic,当前topic被标记成删除。
标记为删除还可以读取数据,只是会在一周后自动删除所有数据
开启配置项:0.8版本中 执行删除命令后会直接删除。
在config/server.properties配置delete.topic.enable=true
[root@node01 ~]# kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --list test [root@node01 ~]# kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --delete --topic test Topic test is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
-
在每台broker节点上删除当前这个topic对应的真实数据。
[root@node01 ~]# cd /opt/module/kafka/logs [root@node01 logs]# ls cleaner-offset-checkpoint log-start-offset-checkpoint meta.properties recovery-point-offset-checkpoint test-0 test-1 test-2 replication-offset-checkpoint [root@node01 logs]# rm -rf ./test-0 [root@node01 logs]# rm -rf ./test-1 [root@node01 logs]# rm -rf ./test-2 [root@node02 logs]# rm -rf ./test-0 [root@node02 logs]# rm -rf ./test-1 [root@node02 logs]# rm -rf ./test-2 [root@node03 logs]# rm -rf ./test-0 [root@node03 logs]# rm -rf ./test-1 [root@node03 logs]# rm -rf ./test-2
-
进入zookeeper客户端,删除topic信息
[root@node01 logs]# zkCli.sh [zk: localhost:2181(CONNECTED) 0] ls /brokers/topics [test] [zk: localhost:2181(CONNECTED) 1] rmr /brokers/topics/test [zk: localhost:2181(CONNECTED) 2] ls /brokers/topics []
-
删除config中的元数据消息
[zk: localhost:2181(CONNECTED) 5] ls /config/topics [test] [zk: localhost:2181(CONNECTED) 6] rmr /config/topics/test [zk: localhost:2181(CONNECTED) 7] ls /config/topics []
-
删除zookeeper中被标记为删除的topic信息
[zk: localhost:2181(CONNECTED) 9] rmr /admin/delete_topics/test