以下操作基于Linux系统
1. 创建一个Topic
用一个分区和一个副本创建一个名为“test”的Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
2. 查看Topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
3. 发送消息
运行生产者,然后在控制台输入一些消息(以行的形式)发送到服务器,CTRL+C 终止消息的输入
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> message 1
> message 2
4. 查看收到的消息
运行消费者,将消息转储到标准输出
实时显示从最初到现在的消息:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
实时查看消息:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
5. 查看消费了多少数据
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper 127.0.0.1:2181 --group hansight --topic event
6. 查看group
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
7. 导入/导出数据
1)准备一些数据,输出到当前目录下的 test.txt 文件
echo -e "test_data01 test_data02" > test.txt
2)启动两个独立模式运行的连接器
执行 connect-standalone.sh,携带三个配置文件作为参数:kafka connect过程的配置、创建连接器的配置
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
第一个连接器:源连接器,用于从输入文件中按行读取数据,并将每个连接生成为Topic
第二个连接器:接收器连接器,从Topic中读取消息,并在文件中已行的形式输出消息
启动后,源连接器开始读取 test.txt,并生成Topic:connect-test;接收器连接器开始读取 connect-test 中的消息,并写入到文件:test.sink.txt
可以在控制台查看Topic中的数据:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
8. 删除Topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
如果提示:marked for deletion,则并未真正删除,需要修改配置文件 kafka/config/server.properties,将 delete.topic.enable=false 改为 true,保存后重启kafka,再执行删除命令即可
9. 查看Topic详情
bin/kafka-topics.sh --zookeeper localhost::2181 --topic test --describe
参考文档:kafka.apache.org/quickstart —— kafka官方文档