注:
适用于kafka集群在线扩缩容;分区重新分配
cat kafka_partition_reassignment.sh
#!/bin/bash
KAFKA_BIN="/app/kafka_2.12-2.1.0/bin"
zk_server="127.0.0.1:2181"
reassignment_json_file="part.json"
reassignment_borker_ids="200,201,202,203,204,205,206,207,208" #重新分配分区填写集群brokerID;扩容增加ID;缩容减少ID。
topic_list=`$KAFKA_BIN/kafka-topics.sh --list --zookeeper $zk_server|grep -v __consumer_offsets` # kafka集群所有topic分区分配
#topic_list=$1 #单个topic 分区分配使用该参数
for n in `echo $topic_list`
do
cat >move.json <<EOF
{"topics": [{"topic": "$n"}],
"version":1
}
EOF
$KAFKA_BIN/kafka-reassign-partitions.sh --zookeeper $zk_server --topics-to-move-json-file move.json --broker-list "$reassignment_borker_ids" --generate|tail -1 >$reassignment_json_file
$KAFKA_BIN/kafka-reassign-partitions.sh --zookeeper $zk_server --reassignment-json-file $reassignment_json_file --execute
sleep 10
for n in `seq 10000`
do
process_num=`$KAFKA_BIN/kafka-reassign-partitions.sh --zookeeper $zk_server --reassignment-json-file $reassignment_json_file --verify|awk '{print $NF}'|grep "progress"|wc -l`
if [ "$process_num" -eq 0 ];then
break
else
$KAFKA_BIN/kafka-reassign-partitions.sh --zookeeper $zk_server --reassignment-json-file $reassignment_json_file --verify
sleep 10
fi
done
done
echo "查看进度:$KAFKA_BIN/kafka-reassign-partitions.sh --zookeeper $zk_server --reassignment-json-file $reassignment_json_file --verify"