参考:https://kafka.apache.org
-
服务相关命令
1、启动/停止zk
> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/zookeeper-server-stop.sh
2、启动kafka
> bin/kafka-server-start.sh config/server.properties
> bin/kafka-server-stop.sh
3、配置多节点brokers集群
首先复制出两个配置文件:
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2
> bin/kafka-server-start.sh config/server-1.properties &
> bin/kafka-server-start.sh config/server-2.properties &
-
topic命令
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic TEST_TOPIC
2、查看topic列表
> bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Alternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.
除了手动创建topic,还可以配置auto-create topics,当一个不存在的topic发布时可自动创建
3、查看某个topic信息
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic TEST_TOPIC
4、修改某个topic的partition个数(分区只能增不能减)
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic TEST_TOPIC --partitions 10
5、删除某个topic(只会删除zookeeper内的元数据,消息文件需要手动删除)
> bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic TEST_TOPIC
-
consumer相关
1、查看所有consumer的group列表
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
2、查看指定group(group_test)的消费情况
> bin/kafka-consumer-groups.sh --describe --group group_test --bootstrap-server localhost:9092
-
console相关
1、生产数据
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TEST_TOPIC
2、消费数据
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic TEST_TOPIC
-
其他
1、重置指定topic的offset
背景,因为程序写的不够好,辛苦造了2亿多数据跑入库测性能,跑到中间数据库服务器撑爆了,结果都写进日志了。。。
> bin/kafka-consumer-groups --bootstrap-server localhost:9092 --group group_test --reset-offsets -to-offset 97000000 --topic TEST_TOPIC --execute
Other supported arguments:
--shift-by [positive or negative integer] - Shifts offset forward or backward from given integer.往前或往后加减offset
--to-current and --to-latest are same as --to-offset [integer] and --to-earliest.
--to-datetime [Datetime format is yyyy-MM-ddTHH:mm:ss.xxx]
> bin/kafka-consumer-groups --bootstrap-server localhost:9092 --group group_test --reset-offsets --to-datetime 2017-08-04T00:00:00.000 [ --all-topics or --topic <topic-name> ] --execute
how to validata:
> bin/kafka-consumer-groups --bootstrap-server locahost:9092 --group <group_id> --describe
注意:这里重置的offset是针对每个partition而言,重置offset之前,所有的consumer必须是inactive的,也就是应用消费必须暂停