指令参数就不解释了,很容易看明白。
如果允许下指令create/delete topic, config 文件需要更改参数:
delete.topic.enable=true
auto.create.topics.enable=true
create topic
./kafka-topics.sh --create --zookeeper zk0:10199,zk1:10198,zk2:10197 --replication-factor 3 --partitions 3 --topic topicName
delete topic
./kafka-topics.sh --delete --zookeeper zk0:10199,zk1:10198,zk2:10197 --topic topicName
list topic
./kafka-topics.sh --describe --zookeeper zk0:10199,zk1:10198,zk2:10197
modify topic parameters
./kafka-topics.sh --zookeeper zk0:10199,zk1:10198,zk2:10197 --alter --partitions 3 --topic topicName
即:无论当前指定的topic中的partition有几个,最终改为3
consumer command
/kafka-console-consumer.sh --bootstrap-server kfk0:11199,kfk1:11198,kfk2:11197 --topic streams-wordcount-output --from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
最简单的将log直接谢入kafka中。
tail -n 0 -f /var/log/xxxx.log | bin/kafka-console-producer.sh --broker-list kfk0:10199 --sync --topic topicName
kafka-console-producer.sh 参数很多,不一一列举了。
指令很多,好好利用一下bin下的工具。