https://www.99ya.net/archives/517
kafka 扩容 与 partitions 增加
kafka 扩容
* kafka的扩容难点:
1)主要在于增加机器之后,数据需要rebalance到新增的空闲节点,即把partitions迁移到空闲机器上。kafka提供了bin/kafka-reassign-partitions.sh工具,完成parttition的迁移。
2)kafka的集群的数据量加大,数据rebalance的时间较长。解决办法是把log.retention.hours=1设置一小时(生产参数24小时)。修改参数之后,重启kakfa节点,kafka会主动purge 1小时之前的log数据。
* 扩容过程
1. 验证kafka节点是否正常加入集群。
zkCli.sh > ls /kafka/brokers/ids/
2. purge数据,使数据迁移更加快速
1) 替换retent.time,只保留最近一个小时的数据。主要是为了方面topic数据快速迁移。
sed -i 's/log.retention.hours=24/log.retention.hours=1/g' /apps/conf/kafka/server.properties
2)关闭kafka
jps -ml |grep 'kafka.Kafka' | awk '{print $1}' |xargs kill -9
3)验证kafka进程
jps -ml |grep 'kafka.Kafka' | awk '{print $1}'
4)启动
kafka-server-start.sh -daemon server.properties
jps -ml |grep 'kafka.Kafka' | awk '{print $1}'
3. 增加partitions
因为增加节点,物理机器机器更多,需要增加partition的个数。
bin/kafka-topics.sh --zookeeper 10.1.11.6:2181 --alter --topic all --partitions 24
4. 重新分配parttion(reassign partitions)
1)获取所有的topic
kafka-topics.sh --list --zookeeper 10.1.11.6:2181
2) reassign partitions**
生成需要迁移的topic partitions信息,broker-list为所有的节点,包括新增节点。
./bin/kafka-reassign-partitions.sh --broker-list "1,2,3" --topics-to-move-json-file move.json --zookeeper 10.1.11.6:2181 --generate
其中topics的json文件内容为:
{"topics": [{"topic": "logstash-product"}], "version":1 }
3)使用上一步生成的建议partition json内容进行完成迁移
“Proposed partition reassignment configuration”后面的内容保存到reassign.json文件中
bin/kafka-reassign-partitions.sh --broker-list "1,2,3" --reassignment-json-file reassign.json --zookeeper 10.1.11.6:2181 --execute
4)修改参数,重启kafka
sed -i 's/log.retention.hours=1/log.retention.hours=24/g' /apps/conf/kafka/server.properties
至此 kafka 扩容完毕