1、准备
服务器:node180 ,node181 ,node182
flume:kafka_2.12-2.0.1 (http://kafka.apache.org/downloads)
jdk:1.8.x
scala-2.12.11
2、解压
tar zxvf /opt/software/kafka_2.12-2.0.1.tgz -C /opt/module/
3、配置
#修改server.properties配置文件 # 不同机器取值不同,cluster1是1,cluster2是2 broker.id=1 #hostname和端口是用来建议给生产者和消费者使用的,如果没有设置,将会使用listeners的配置, #如果listeners也没有配置,将使用java.net.InetAddress.getCanonicalHostName()来获取 #这个hostname和port advertised.listeners=PLAINTEXT://node180:9092 #zookeeper 集群地址 zookeeper.connect=node180:2181,node181:2181,node182:2181 #设置topic 可以删除 delete.topic.enable=true #日志路径 log.dirs=/opt/module/kafka_2.12-2.0.1/logs
4、系统变量
vi /etc/profile
#kafka export KAFKA_HOME=/opt/module/kafka_2.12-2.0.1 export PATH=$KAFKA_HOME/bin:$PATH
source /etc/profile
5、同步其他机器
scp -r /opt/module/kafka_2.12-2.0.1/ root@node181:/opt/module/ scp -r /opt/module/kafka_2.12-2.0.1/ root@node182:/opt/module/
再次修改修改server.properties配置文件
(注意修改配置文件server.properties : broker.id ,advertised.listeners 根据实际情况填写)
6、启动
kafka-server-start.sh /opt/module/kafka_2.12-2.0.1/config/server.properties &
启动后回车
jps 查看
7、测试
创建topic:
kafka-topics.sh --create --zookeeper node180:2181,node181:2181,node182:2181 --replication-factor 3 --partitions 3 --topic mykafka
查看topic:
kafka-topics.sh --list --zookeeper node180:2181,node181:2181,node182:2181
查看指定Topic状态:
kafka-topics.sh --describe --zookeeper node180:2181,node181:2181,node182:2181 --topic mykafka
删除topic
连接:zookeeper-shell.sh --zookeeper node180:2181
查询:ls /brokers/topics
删除:rmr /brokers/topics/yourtopic
删除并确认topic是否存在:kafka-topics.sh --list --zookeeper node180:2181