a)kafka搭建
1、解压
2、修改配置/software/kafka_2.11-0.11.0.3/config/server.properties
broker.id=0
log.dirs=/var/huawei/kafka-logs
zookeeper.connect=node03:2181,node04,2181:node05:2181
delete.topic.enable=true
3、配置kafka启动脚本
a)创建文件
/software/kafka_2.11-0.11.0.3/bin/startKafka.sh
b)输入
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
c)修改启动脚本权限
chmod +x ./startKafka.sh
4、分发到node02,node03
5、修改broker.id
a)node02的/software/kafka_2.11-0.11.0.3/config/server.properties
broker.id=1
b)node02的/software/kafka_2.11-0.11.0.3/config/server.properties
broker.id=2
6、配置环境变量
export KAFKA_HOME=/software/kafka_2.11-0.11.0.3
export PATH=$PATH:$KAFKA_HOME/bin
7、脚本启动 到/software/kafka_2.11-0.11.0.3
创建startKafka.sh 添加以下命令
nohup /software/kafka_2.11-0.11.0.3/bin/kafka-server-start.sh /software/kafka_2.11-0.11.0.3/config/server.properties > kafka.log 2>&1 &
a)kafka命令
1).创建topic
zookeeper地址 操作方式 topic名字 分区个数 副本个数
kafka-topics.sh --zookeeper node03:2181,node04:2181,node05:2181 --create --topic test --partitions 3 --replication-factor 3
2).查看集群中的topic:
kafka-topics.sh --zookeeper node03:2181,node04:2181,node05:2181 --list
3).console当做消息的生产者
生产消息时指定kafka集群 指定生产到那个topic
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
4).console当做消息消费者
默认找2181端口,所以可以不写
kafka-console-consumer.sh --zookeeper node03,node04,node05 --topic test
从头消费
kafka-console-consumer.sh --zookeeper node03,node04,node05 --topic test --from-beginning
5).查看topic详细描述
进入zkCli.sh客户端
zookeeper中broker信息:get /brokers/topics/test/partitions/0/state
zookeeper中consumer信息:get /consumers/console-consumer-53584/offsets/test/2
命令:查看test的topic详细信息
kafka-topics.sh --zookeeper node03:2181,node04:2181,node05:2181 --describe --topic test
6).删除topic
kafka-topics.sh --zookeeper node03,node04,node05 --delete --topic test1
在/software/kafka_2.11-0.11.0.3/config/server.properties 配置 delete.topic.enable=true