一.环境准备
当前环境:centos7.3一台
软件版本:kafka_2.12
部署目录:/usr/local/kafka
启动端口:9092
配置文件:/usr/local/kafka/config/server.properties
yum依赖yum install java-1.8.0-openjdk
需要部署zookeeper单点
二.安装
1.下载kafka包wget http://mirrors.hust.edu.cn/apache/kafka/2.1.0/kafka_2.12-2.1.0.tgz
2.解压并移动,然后创建日志目录tar zxvf kafka_2.12-0.10.2.1.tgz
mv kafka_2.12-0.10.2.1 /usr/local/kafka
mkdir /var/log/kafka
3.修改配置文件,将最后面的zookeeper地址修改vim /usr/local/kafka/config/server.properties
#连接zookeeper地址端口
zookeeper.connect=127.0.0.1:2181
三.使用验证
启动/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
检查netstat -unltp | grep 9092
一.环境准备
当前环境:centos7.3三台
软件版本:kafka_2.12
部署目录:/usr/local/kafka
启动端口:9092
配置文件:/usr/local/kafka/config/server.properties
yum依赖(3台同时操作)yum install java-1.8.0-openjdk
需要部署zookeeper集群
二.安装
1.下载kafka包(3台节点都执行)wget http://mirror.bit.edu.cn/apache/kafka/0.10.2.1/kafka_2.12-0.10.2.1.tgz
2.解压并移动,然后创建日志目录(3台节点都执行)tar zxvf kafka_2.12-0.10.2.1.tgz
mv kafka_2.12-0.10.2.1 /usr/local/kafka
mkdir /var/log/kafka
3.修改配置文件(3台同时操作,需要更改的地方不一样)vim /usr/local/kafka/config/server.properties
#此为第一台,第二台为2 第三台为3
broker.id=1
# Switch to enable topic deletion or not, default value is false
delete.topic.enable=true
#本机开启端口和监听端口
advertised.host.name=192.168.1.189
# The number of threads handling network requests
num.network.threads=3
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
#日志目录
log.dirs=/var/log/kafka
#开启10个分区
num.partitions=10
#kafka保存消息的副本数
default.replication.factor=3
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
#持久化时间
log.retention.hours=48
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# to the retention policies
log.retention.check.interval.ms=300000
#连接zookeeper地址端口
zookeeper.connect=192.168.1.189:2181,192.168.1.190:2181,192.168.1.191:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
三.使用验证
启动(3台都需要启动)/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
检查netstat -unltp | grep 9092
一.基本操作
1.启动kafkabin/kafka-server-start.sh -daemon config/server.properties
2.关闭kafkabin/kafka-server-stop.sh
3.查看kafka topic是否支持集群,没反应就是正确bin/kafka-topics.sh --describe --zookeeper 1.1.1.1:2181,1.1.1.2:2181,1.1.1.3:2181 --topic MyTopic
4.查看当前zookerrper下的kafka集群所有的topicbin/kafka-topics.sh --list --zookeeper 1.1.1.1:2181
5.详细查看topicbin/kafka-topics.sh --describe --zookeeper 1.1.1.1:2181 --topic topic名字
6.创建一个topic,副本备份数1个,分区数1个bin/kafka-topics.sh --create --zookeeper 1.1.1.1:2181 --replication-factor 1 --partitions 1 --topic topic名字
7.删除一个topicbin/kafka-topics.sh --zookeeper 1.1.1.1:2181 --delete --topic topic名字
8.改变集群模式为主主,切换主从到主主模式,解决主从模式下,从从选举时间问题bin/kafka-preferred-replica-election.sh --zookeeper 1.1.1.1:2181
二.生产消费测试
以 sync 模式启动一个producer,info.test是topic名bin/kafka-console-producer.sh --broker-list 1.1.1.1:9092,1.1.1.2:9093,1.1.1.3:9094 --sync --topic info.test
然后,输入以下内容:Hello, world!
启动一个 consumer,在另一个终端运行:bin/kafka-console-consumer.sh --zookeeper 1.1.1.1:2181 --topic info.test --from-beginning
观察输出,你会看到下面内容:
Hello, world!