开启关闭:
1、开启kafka服务:
bin/kafka-server-start.sh config/server.properties >logs/kafka-server.log 2>&1 &
>logs/kafka-server.log :将产生的日志输入到logs/kafka-server.log文件
2>&1:将错误输出的日志重定向为到标准日志输出的文件中
&:最后一个&代表后台运行
2、关闭kafka服务:
bin/kafka-server-start.sh
增删改查:
1、 创建新的topic
bin/kafka-topics.sh --create --zookeeper node167:2181,node168:2181,node169:2181 --replication-factor 3 --partitions 2 --topic world
--zookeeper:必须添加
--replication-factor:副本数(replication-factor不能大于broker数)
--partitions:分区数
在创建topic时,需要知道创建几个副本,包含几个分区。
2、查看topic
bin/kafka-topics.sh --list --zookeeper node167:2181,node168:2181,node169:2181
3、查看world的详细信息
bin/kafka-topics.sh --describe --zookeeper node167:2181,node168:2181,node169:2181 --topic world
--describe :详细信息
4、修改topic的信息
bin/kafka-topics.sh --alter --zookeeper node167:2181,node168:2181,node169:2181 --partitions 3 --topic hello
--alter :修改
注意:修改分区数量时只能增加不能减少
5、删除topic
bin/kafka-topics.sh --delete --zookeeper node167:2181,node168:2181,node169:2181 --topic hello
--delete:删除
查看topic列表:
--delete不能直接删除,只是标记删除。想要真正删除需要在server中配置delete.topic.enable=true,配置完成之后重启kafka,就可以直接删除了。
生产消费:
1、创建生产者:
bin/kafka-console-producer.sh --broker-list node167:9092,node168:9092,node169:9092 --topic hello
创建生产者时不需要指定 --zookeeper等信息。
因为生产者只需要向已经存在的topic中添加数据就可以了,不要知道topic的位置,以及副本数什么的。
创建时读取producer.properties配置文件,所以端口是9092
2、创建消费者:
bin/kafka-console-consumer.sh --zookeeper node167:2181,node168:2181,node169:2181 --topic hello --from-beginning
创建时读取consumer.properties配置文件,所以端口是2181
其他:
1、覆盖原server配置项
--config:覆盖原server配置项