【1】前提条件
(1)部署集群至少要3台机器,奇数节点
(2)kafka 的安装需要 java 环境,jdk 1.8以上
(3)本次实验安装包:kafka_2.13-2.8.0.tgz
(4)假设3台服务器分别为:kafka1、kafka2、kafka3
服务器名称 |
IP |
域名 |
kafka1 |
192.168.175.132 |
kafka1.sd.cn |
kafka2 |
192.168.175.132 |
kafka2.sd.cn |
kafka3 |
192.168.175.132 |
kafka3.sd.cn |
这里我已经有jdk了,就不装了;
【2】ZK集群搭建
(2.2以上好像就可以无 zk 集群搭建了)
(2.1)编辑zk配置文件
tar -zxf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0
mkdir zk_kfk_data
cd ../
mv kafka_2.13-2.8.0 /data/kafka
cd /data/kafka
cd config
vim zookeeper.properties
---------------------------------------------------------
dataDir=/data/kafka/zk_kfk_data
#Zookeeper保存日志文件的目录
#dataLogDir=/data/zookeeper/logs
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
tickTime=2000
initLimit=10
syncLimit=5
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
server.1=192.168.175.132:2888:3888
server.2=192.168.175.148:2888:3888
server.3=192.168.175.147:2888:3888
--------------------------------------------
三台机器上的zookeeper.properties文件配置相同,data.Dir 为zk的数据目录,server.1、server.2、server.3 为集群信息。
2888端口号是zookeeper服务之间通信的端口
3888端口是zookeeper与其他应用程序通信的端口。
tickTime:CS通信心跳数
Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
tickTime以毫秒为单位。
tickTime:该参数用来定义心跳的间隔时间,zookeeper的客户端和服务端之间也有和web开发里类似的session的概念,而zookeeper里最小的session过期时间就是tickTime的两倍。
initLimit:LF初始通信时限
集群中的follower服务器(F)与leader服务器(L)之间 初始连接 时能容忍的最多心跳数(tickTime的数量)
syncLimit:LF同步通信时限
集群中的follower服务器(F)与leader服务器(L)之间 请求和应答 之间能容忍的最多心跳数(tickTime的数量)
(2.2)构建myid文件
创建myid文件:进入/data/kafka/zk_kfk_data目录,创建myid文件,将三台服务器上的myid文件分别写入1,2,3。
myid是zookeeper集群用来发现彼此的标识,必须创建,且不能相同。
--------------
echo "1">/data/kafka/zk_kfk_data/myid
echo "2">/data/kafka/zk_kfk_data/myid
echo "3">/data/kafka/zk_kfk_data/myid
---------------------
(2.3)构建环境变量
cd /data/kafka/bin/
echo "export PATH=${PATH}:`pwd`" >>/etc/profile
source /etc/profile
(2.4)启动ZK集群
nohup zookeeper-server-start.sh /data/kafka/config/zookeeper.properties >>/data/kafka/zookeeper.log &
less /data/kafka/zookeeper.log
看看错误日志,没有问题就ok
然后,防火墙开放
2181、2888、3888端口
(2.5)登录验证
zookeeper-shell.sh 192.168.175.132:2182
如上图,成功
【3】kafka集群
(3.1)修改配置文件 server.properties
(1)进入目录:cd /data/kafka
(2)创建 kafka 日志数据目录:mkdir kafka-logs
(3)进入配置目录: cd /data/kafka/config
(4)修改 server.properties 配置文件
----------------------------------
broker.id=0
advertised.listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-logs-1
num.partitions=5
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.175.132:2181,192.168.175.148:2181,192.168.175.147:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
------------------------
broker.id:三个节点要配置不同的值,分别配置为0,1,2
advertised.listeners:对外监听端口
log.dirs:日志目录
num.partitions:默认分区数量
log.retention.hours:日志保留时间
zookeeper.connect:zookeeper连接地址,多个以逗号隔开
(3.2)启动 kafka 集群
nohup kafka-server-start.sh /data/kafka/config/server.properties >>/data/kafka/kafka.log &
less /data/kafka/kafka.log
【4】测试 kafka 集群
(4.1)创建 topic : test
kafka-topics.sh --help
kafka-topics.sh --create --zookeeper 192.168.175.132:2181,192.168.175.148:2181,192.168.175.147:2181 --partitions 3 --replication-factor 1 --topic test
--replication-factor 2,才会有1个副本,否则就只有分区本身,也就是说会有副本因子-1个副本;如果设置成3就是 1主2从
结果:
Created topic test.
(4.2)查看 topic
kafka-topics.sh --list --zookeeper 192.168.175.147:2181
结果:test
kafka-topics.sh --describe --zookeeper 192.168.175.147:2181
(4.3)模拟客户端发布订阅
发布:kafka-console-producer.sh --bootstrap-server 192.168.175.132:9092,192.168.175.148:9092,192.168.175.147:9092 --topic test
订阅:kafka-console-consumer.sh --bootstrap-server 192.168.175.132:9092,192.168.175.148:9092,192.168.175.147:9092 --from-beginning --topic test
【5】副本扩缩、分区迁移、跨路径迁移 kafka-reassign-partitions
详细参考:https://mp.weixin.qq.com/s/fQ03wpctV1dGnmk1r-xEWA
脚本参数
参数 | 描述 | 例子 | |
---|---|---|---|
--zookeeper(高版本已启用) |
连接zk | --zookeeper localhost:2181, localhost:2182 |
|
--bootstrap-server(高版本用) |
连接kafka | --bootstrap-server localhost:9092,x.x.x.x:9092 |
|
--topics-to-move-json-file |
指定json文件,文件内容为topic配置 |
|
|
--generate |
尝试给出副本重分配的策略,该命令并不实际执行 | ||
--broker-list |
指定具体的BrokerList,用于尝试给出分配策略,与--generate 搭配使用 |
--broker-list 0,1,2,3 |
|
--reassignment-json-file |
指定要重分配的json文件,与--execute 搭配使用 |
json文件格式如下例如: |
|
--execute |
开始执行重分配任务,与--reassignment-json-file 搭配使用 |
||
--verify |
验证任务是否执行成功,当有使用--throttle 限流的话,该命令还会移除限流;该命令很重要,不移除限流对正常的副本之间同步会有影响 |
||
--throttle |
迁移过程Broker之间现在流程传输的速率,单位 bytes/sec | -- throttle 500000 |
|
--replica-alter-log-dirs-throttle |
broker内部副本跨路径迁移数据流量限制功能,限制数据拷贝从一个目录到另外一个目录带宽上限 单位 bytes/sec | --replica-alter-log-dirs-throttle 100000 |
|
--disable-rack-aware |
关闭机架感知能力,在分配的时候就不参考机架的信息 | ||
--bootstrap-server |
如果是副本跨路径迁移必须有此参数 |
(5.1)脚本的使用介绍
关键参数--generate
1=》构造文件
cd /data/kafka
vim move.json
{
"topics": [
{"topic": "test"}
],
"version": 1
}
运行 generate 参数生成 当前副本的配置介绍 json,以及建议修改的 json
可以通过 下面来获取 broker-list 值
[root@DB6 /data/kafka]$ kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type brokers Dynamic configs for broker 0 are: Dynamic configs for broker 1 are: Dynamic configs for broker 2 are: Default configs for brokers in the cluster are:
如上图查询,我们发现该集群的 broker 列表是 0,1,2,那我们
运行 generate 参数生成 当前副本的配置介绍 json,以及建议修改的 json
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file ./move.json --broker-list "0,1,2" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[2,1,0],"log_dirs":["any","any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[0,2,1],"log_dirs":["any","any","any"]}]}
我找json 在线格式化看看
对比一下发现,partition 1 和 2 的replicas 不一样了;
(5.2)执行 json文件
kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./reassignment.json --execute
[root@DB6 /data/kafka]$ kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./reassignment.json --execute Warning: --zookeeper is deprecated, and will be removed in a future version of Kafka. Current partition replica assignment {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[2,1,0],"log_dirs":["any","any","any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started partition reassignments for test-0,test-1,test-2
对比结果:上面是操作前,下面是操作后,很明显 replicas位置表了一下,但 isr 并没有改变
迁移过程注意流量陡增对集群的影响 Kafka提供一个broker之间复制传输的流量限制,限制了副本从机器到另一台机器的带宽上限;
当重新平衡集群,引导新broker,添加或移除broker时候,这是很有用的。
因为它限制了这些密集型的数据操作从而保障了对用户的影响、 例如我们上面的迁移操作加一个限流选项-- throttle 50000000
kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./reassignment.json --execute --throttle 50000000
在后面加上一个—throttle 50000000
参数, 那么执行移动分区的时候,会被限制流量在50000000 B/s
加上参数后你可以看到
The throttle limit was set to 50000000 B/s Successfully started reassignment of partitions.
需要注意的是
如果你迁移的时候包含 副本跨路径迁移(同一个Broker多个路径)那么这个限流措施不会生效;
你需要再加上|--replica-alter-log-dirs-throttle
这个限流参数,它限制的是同一个Broker不同路径直接迁移的限流;
(5.3)验证
关键参数--verify
该选项用于检查分区重新分配的状态,同时—throttle
流量限制也会被移除掉; 否则可能会导致定期复制操作的流量也受到限制。
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file ./reassignment.json --verify
如果 topic 不存在则报错:
【6】副本缩容实践
当副本分配少于之前的数量时候,多出来的副本会被删除; 比如刚刚我现在是3个副本,想恢复到2个副本,删掉 broker id 为2 的节点上的副本
(6.1)利用(5.1)generate 获取当前配置json
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file ./move.json --broker-list "0,1,2" --generate Current partition replica assignment {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[0,2,1],"log_dirs":["any","any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[0,2,1],"log_dirs":["any","any","any"]}]}
我们转化一下之后, 构造 reassignment .json
把 replicas 中的 2 全部干掉;log_dirs 也只有2个
{ "version": 1, "partitions": [{ "topic": "test", "partition": 0, "replicas": [1, 0], "log_dirs": ["any", "any"] }, { "topic": "test", "partition": 1, "replicas": [1, 0], "log_dirs": ["any", "any"] }, { "topic": "test", "partition": 2, "replicas": [0, 1], "log_dirs": ["any", "any"] }] }
(6.2)执行缩容
[root@DB6 /data/kafka]$ kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file ./reassignment.json --execute --throttle 50000000 Current partition replica assignment {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[0,2,1],"log_dirs":["any","any","any"]}]} Save this to use as the --reassignment-json-file option during rollback Warning: You must run --verify periodically, until the reassignment completes, to ensure the throttle is removed. The inter-broker throttle limit was set to 50000000 B/s Successfully started partition reassignments for test-0,test-1,test-2 -- 说建议我们运行 --verify 查看确保已经移除 [root@DB6 /data/kafka]$ kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file ./reassignment.json --verify Status of partition reassignment: Reassignment of partition test-0 is complete. Reassignment of partition test-1 is complete. Reassignment of partition test-2 is complete. Clearing broker-level throttles on brokers 0,1,2 Clearing topic-level throttles on topic test
--缩容之后发现好像没什么问题
操作前后 topic 情况:
成功;
同时 我去 broker 为 2的 节点上看,文件夹确实没有了,这里就不贴图了;
【7】副本扩容实践
加一个分区副本
(7.1)利用(5.1)generate 获取当前配置json
[root@DB6 /data/kafka]$ kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file ./move.json --broker-list "0,1,2" --generate Current partition replica assignment {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"test","partition":1,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"test","partition":2,"replicas":[0,1],"log_dirs":["any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":1,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"test","partition":2,"replicas":[0,1],"log_dirs":["any","any"]}]} [root@DB6 /data/kafka]$
我们转化一下之后, 构造 reassignment .json
{ "version": 1, "partitions": [{ "topic": "test", "partition": 0, "replicas": [2, 1, 0], "log_dirs": ["any", "any", "any"] }, { "topic": "test", "partition": 1, "replicas": [1, 0, 2], "log_dirs": ["any", "any", "any"] }, { "topic": "test", "partition": 2, "replicas": [0, 1, 2], "log_dirs": ["any", "any", "any"] }] }
(7.2)开始扩容
[root@DB6 /data/kafka]$ kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file ./reassignment.json --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"test","partition":1,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"test","partition":2,"replicas":[0,1],"log_dirs":["any","any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started partition reassignments for test-0,test-1,test-2 [root@DB6 /data/kafka]$ kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file ./reassignment.json --verify Status of partition reassignment: Reassignment of partition test-0 is complete. Reassignment of partition test-1 is complete. Reassignment of partition test-2 is complete. Clearing broker-level throttles on brokers 0,1,2 Clearing topic-level throttles on topic test
结果对比:
但我们发现 leader 有问题;
重新选举一下:
[root@DB6 /data/kafka]$ kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred --all-topic-partitions Successfully completed leader election (PREFERRED) for partitions test-1, test-0
leader 变成均匀分布了,完成
【8】 副本跨路径迁移
为什么线上Kafka机器各个磁盘间的占用不均匀,经常出现“一边倒”的情形?这是因为Kafka只保证分区数量在各个磁盘上均匀分布,但它无法知晓每个分区实际占用空间,故很有可能出现某些分区消息数量巨大导致占用大量磁盘空间的情况。在1.1版本之前,用户对此毫无办法,因为1.1之前Kafka只支持分区数据在不同broker间的重分配,而无法做到在同一个broker下的不同磁盘间做重分配。1.1版本正式支持副本在不同路径间的迁移
怎么在一台Broker上用多个路径存放分区呢?
只需要在配置上接多个文件夹就行了
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=kafka-logs-5,kafka-logs-6,kafka-logs-7,kafka-logs-8
注意同一个Broker上不同路径只会存放不同的分区,而不会将副本存放在同一个Broker; 不然那副本就没有意义了(容灾)
怎么针对跨路径迁移呢?
迁移的json文件有一个参数是log_dirs
; 默认请求不传的话 它是"log_dirs": ["any"]
(这个数组的数量要跟副本保持一致) 但是你想实现跨路径迁移,只需要在这里填入绝对路径就行了,例如下面
迁移的json文件示例
{
"version": 1,
"partitions": [{
"topic": "test_create_topic4",
"partition": 2,
"replicas": [0],
"log_dirs": ["/Users/xxxxx/work/IdeaPj/source/kafka/kafka-logs-5"]
}, {
"topic": "test_create_topic4",
"partition": 1,
"replicas": [0],
"log_dirs": ["/Users/xxxxx/work/IdeaPj/source/kafka/kafka-logs-6"]
}]
}
然后执行脚本
sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx --reassignment-json-file config/reassignment-json-file.json --execute --bootstrap-server
xxxxx:9092 --replica-alter-log-dirs-throttle 10000
注意 --bootstrap-server
在跨路径迁移的情况下,必须传入此参数
如果需要限流的话 加上参数|--replica-alter-log-dirs-throttle
; 跟--throttle
不一样的是 --replica-alter-log-dirs-throttle
限制的是Broker内不同路径的迁移流量;
【9】分区扩容缩容 topic
(9.1)分区缩容=》不允许
[root@DB6 /data/kafka]$ kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic test --partitions 2 Error while executing topic command : Topic currently has 3 partitions, which is higher than the requested 2. [2021-09-03 18:03:17,386] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 3 partitions, which is higher than the requested 2. (kafka.admin.TopicCommand$) [root@DB6 /data/kafka]$
那么我们知道直接这样是不行的,可以通过【6】中的办法去缩容;
(9.2)分区扩容=》允许
(我的实践)
Topic: test2 TopicId: yF8LT56YSXyB3U0PLzmUBQ PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: test2 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test2 Partition: 1 Leader: 2 Replicas: 2 Isr: 2
添加分区:[root@DB6 /data/kafka]$ kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic test2 --partitions 3
Topic: test2 TopicId: yF8LT56YSXyB3U0PLzmUBQ PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: test2 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test2 Partition: 1 Leader: 2 Replicas: 2 Isr: 2 Topic: test2 Partition: 2 Leader: 2 Replicas: 2 Isr: 2
添加分区,指定到固定节点:
[root@DB6 /data/kafka]$ kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic test2 --partitions 6 --replica-assignment 0,1,2,0,2,2 [root@DB6 /data/kafka]$ kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic "test2" Topic: test2 TopicId: yF8LT56YSXyB3U0PLzmUBQ PartitionCount: 6 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: test2 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test2 Partition: 1 Leader: 2 Replicas: 2 Isr: 2 Topic: test2 Partition: 2 Leader: 2 Replicas: 2 Isr: 2 Topic: test2 Partition: 3 Leader: 0 Replicas: 0 Isr: 0 Topic: test2 Partition: 4 Leader: 2 Replicas: 2 Isr: 2 Topic: test2 Partition: 5 Leader: 2 Replicas: 2 Isr: 2
(9.3)分区迁移
分区迁移,参考【5/6/7】
参考:
zk方式(不推荐)
>bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
kafka版本 >= 2.2 支持下面方式(推荐)
单个Topic扩容
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4
批量扩容 (将所有正则表达式匹配到的Topic分区扩容到4个)
sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server 172.23.248.85:9092 --alter --partitions 4
".*?"
正则表达式的意思是匹配所有; 您可按需匹配
PS: 当某个Topic的分区少于指定的分区数时候,他会抛出异常;但是不会影响其他Topic正常进行;
相关可选参数 | 参数 |描述 |例子| |--|--|--| |--
replica-assignment
|副本分区分配方式;创建topic的时候可以自己指定副本分配情况;
|--replica-assignment
BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ;
这个意思是有三个分区和三个副本,对应分配的Broker; 逗号隔开标识分区;冒号隔开表示副本|
PS: 虽然这里配置的是全部的分区副本分配配置,但是正在生效的是新增的分区; 比如: 以前3分区1副本是这样的
Broker-1 | Broker-2 | Broker-3 | Broker-4 |
---|---|---|---|
0 | 1 | 2 |
现在新增一个分区,--replica-assignment
2,1,3,4 ; 看这个意思好像是把0,1号分区互相换个Broker
Broker-1 | Broker-2 | Broker-3 | Broker-4 |
---|---|---|---|
1 | 0 | 2 | 3 |
但是实际上不会这样做,Controller在处理的时候会把前面3个截掉; 只取新增的分区分配方式,原来的还是不会变
Broker-1 | Broker-2 | Broker-3 | Broker-4 |
---|---|---|---|
0 | 1 | 2 | 3 |
【故障处理】
(1)serverid null is not a number
在 zk 配置文件中指定的data目录下,有个 myid 文件,里面值不能为空 也不能和其他 zk 重复;
修改一下即可;