KafKa集群是依赖于zookeeper的,所以想要部署一个分布式的KafKa集群,首先需要搭建一个Zookeeper集群,至于如何搭建zookeeper集群,在我的另一篇博文里已经说过了,这里不再讲述。安装好zookeeper集群以后,部署KafKa集群就简单多了,只需要相应的的Jar包即可。Jar包可以在KafKa官网下载,网址如下:http://kafka.apache.org/downloads.html。
我用的版本是0.11.0的,下载的地址如下:https://t00y.com/file/28012292-455761212
上传到集群中的某个服务器之后,解压缩:
$ tar zxvf kafka_2.11 0.11.0.0.tgz -C /opt/module
在Kafka的安装目录下,创建一个文件夹,用于存放Topic中的消息(Topic中的消息默认是会存放在Kafka本地7天的),:
mkdir data
进入到KafKa的conf文件夹,里面都是相关的配置文件,其中最重要的就是server.properties文件,里面包含了与KafKa集群相关的配置,现在就来修改一下相关的配置:
# The id of the broker. This must be set to a unique integer for each broker.
# 这是broker的全局唯一编号,不能重复,全局是指整个集群
broker.id=0
# Switch to enable topic deletion or not, default value is false
# 将这个参数设置为true,否则将不能真正删除topic
delete.topic.enable=true
############################# Socket Server Settings #############################
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://192.168.182.101:9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
# topic消息的存放地址
log.dirs=/opt/module/kafka/data
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
#log.flush.interval.ms=1000
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
# 配置连接zookeeper集群的地址
zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181
……
配置环境变量,vi /etc/profile
#KAFKA_HOME export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin
修改完,别忘记:source /etc/profile
最后分发一下kafka这个文件夹到集群的各个节点,然后一定要修改下各个节点的server.properties文件的broker.id这个属性值,不能全局重复!!! 环境变量也要配置!!!
启动集群的方式,就是分别执行各个节点的KafKa下的bin文件夹下的一个kafka-server-start.sh文件,同时加上一些命令行参数,如下:
$ bin/kafka-server-start.sh -daemon config/server.properties
关闭集群,同样是执行另一个文件:
$ bin/kafka-server-stop.sh stop
上面的方式一个节点一个节点的启动整个集群,太麻烦了,可以将上述的启动和关闭集群的命令写成一个Shell脚本来实现,整个集群的启动和关闭。具体可以见我另一篇博文。
Kafka命令行操作
与topic相关的增删改查命令都包含在kafka-topics.sh文件中
1)查看当前服务器中的所有topic
$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --list
2)创建topic
$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first
选项说明:
--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数
创建的topic暂存于指定的路径下,其就是在server.properties文件中的log.dirs属性指定的路径,topic存储时的命名方式为:topic名字+分区名
创建topic时,其中分区的副本数应小于等于broker的数量,分区数可以大于broker的数量。
3)删除topic
$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
4)发送消息
$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
输入命令之后,进入阻塞状态,可以输入要发送的消息:
>hello world
>atguigu atguigu
5)消费消息
$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --from-beginning --topic first
--from-beginning:会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。
消费者启动的时候,会生成一个专门用于记录offset信息的主题,该主题默认为50个分区,这些文件夹的名称为 "__consumer_offsets-n",n代表分区号。
6)查看某个Topic的详情
$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first
7)修改分区数
$bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6