在kafka中增加新的节点后,数据是不会自动迁移到新的节点上的,需要我们手动将数据迁移(或者成为打散)到新的节点上
1 迁移方法
kafka为我们提供了用于数据迁移的脚本。我们可以用这些脚本完成数据的迁移。
1.1 生成partiton分配表
1.1.1 创建json文件topic-to-move.json
{
"topics": [{"topic": "testTopic"}],
"version": 1
}
1.1.2 生成partiton分配表
运行
$ ./kafka-reassign-partitions --zookeeper ${zk_address} --topics-to-move-json-file topic-to-move.json --broker-list "140,141" --generate
其中${zk_address}是kafka所连接zk地址,"140,141"为该topic将要迁移到的目标节点。
生成结果如下:
Current partition replica assignment
{"version":1,"partitions":[{"topic":"testTopic","partition":1,"replicas":[61,62]},{"topic":"testTopic","partition":0,"replicas":[62,61]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"testTopic","partition":1,"replicas":[140,141]},{"topic":"testTopic","partition":0,"replicas":[141,140]}]}
其中上半部分是当前的partiton分布情况,下半部分是迁移成功后的partion分布情况。那么我们就用下部分的json进行迁移。另外,也可以自己构造个类似的json文件,同样可以进行迁移。这里我们使用脚本为我们生成的json文件,将下半部分的json保存为expand-cluster-reassignment.json
1.2 执行迁移
$ ./kafka-reassign-partitions --zookeeper ${zk_address} --reassignment-json-file expand-cluster-reassignment.json --execute
1.3 查看迁移进度
$ ./kafka-reassign-partitions --zookeeper ${zk_address} --reassignment-json-file expand-cluster-reassignment.json --verify
2 源码分析
2.1 脚本调用
kafka-reassign-partitions.sh会调用kafka.admin.ReassignPartitionsCommand.scala,在代码运行过程中抛出的任何异常都会通过标准输出打印出来,所以如果执行该脚本报错,可以看下这块代码来定位问题。
def main(args: Array[String]): Unit = {
// 略
try {
if(opts.options.has(opts.verifyOpt)) // 校验
verifyAssignment(zkUtils, opts)
else if(opts.options.has(opts.generateOpt)) // 生成json
generateAssignment(zkUtils, opts)
else if (opts.options.has(opts.executeOpt)) // 执行迁移
executeAssignment(zkUtils, opts)
} catch {
case e: Throwable =>
println("Partitions reassignment failed due to " + e.getMessage)
println(Utils.stackTrace(e))
} finally {
val zkClient = zkUtils.zkClient
if (zkClient != null)
zkClient.close()
}
2.1.1 executeAssignment
executeAssignment 用于执行迁移。
def executeAssignment(zkUtils: ZkUtils,reassignmentJsonString: String){
// 略,做一些校验和去重等工作
// 获取当前的partition分布情况
zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic))
println("Current partition replica assignment
%s
Save this to use as the --reassignment-json-file option during rollback"
.format(zkUtils.formatAsReassignmentJson(currentPartitionReplicaAssignment)))
// 重点,执行迁移,z即将json写到zk上,准确的说是写到"/admin/reassign_partitions"下
// start the reassignment
if(reassignPartitionsCommand.reassignPartitions())
println("Successfully started reassignment of partitions %s".format(zkUtils.formatAsReassignmentJson(partitionsToBeReassigned.toMap)))
else
println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
}
executeAssignment将json写到zk上后,brokerwatch到节点数据变化就开始进行迁移了
2.1.2 verifyAssignment
verifyAssignment用于校验迁移进度
def verifyAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
// 略
println("Status of partition reassignment:")
val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkUtils, partitionsToBeReassigned) // 重点
reassignedPartitionsStatus.foreach { partition =>
partition._2 match {
case ReassignmentCompleted =>
println("Reassignment of partition %s completed successfully".format(partition._1))
case ReassignmentFailed =>
println("Reassignment of partition %s failed".format(partition._1))
case ReassignmentInProgress =>
println("Reassignment of partition %s is still in progress".format(partition._1))
}
}
}
private def checkIfReassignmentSucceeded(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
:Map[TopicAndPartition, ReassignmentStatus] = {
val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas) // 从zk节点"/admin/reassign_partitions"读取迁移信息
partitionsToBeReassigned.map { topicAndPartition =>
(topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkUtils,topicAndPartition._1,
topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned))
}
}
def checkIfPartitionReassignmentSucceeded(zkUtils: ZkUtils, topicAndPartition: TopicAndPartition,
reassignedReplicas: Seq[Int],
partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]],
partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = {
val newReplicas = partitionsToBeReassigned(topicAndPartition)
partitionsBeingReassigned.get(topicAndPartition) match {
case Some(partition) => ReassignmentInProgress // 如果tp对应的数据存在则说明还在迁移
case None => // 否则可能是成功了
// check if the current replica assignment matches the expected one after reassignment
val assignedReplicas = zkUtils.getReplicasForPartition(topicAndPartition.topic, topicAndPartition.partition)
if(assignedReplicas == newReplicas) // 重点,如果节点不存在了,但是迁移后的replica列表和预期不一致,则报错
ReassignmentCompleted
else { // 经常遇到的报错
println(("ERROR: Assigned replicas (%s) don't match the list of replicas for reassignment (%s)" +
" for partition %s").format(assignedReplicas.mkString(","), newReplicas.mkString(","), topicAndPartition))
ReassignmentFailed
}
}
}
从源码中可以看出判断迁移是否完成是根据"/admin/reassign_partitions"是否存在来判断。如果节点不存在了,并且迁移后的AR和预期一致,则才算成功。
注意:在实际迁移中遇到过好几次报错类似如下,即上面代码的打印的日志
don't match the list of replicas for reassignment
从代码中可以看到出现这个错误的原因是"/admin/reassign_partitions"不存在了,但是当前topic的AR和预期的不一致。这个原因一般是由于迁移的时候broker那边报错了,然后将节点删除了,并没有进行迁移。具体原因需要看下broker的controller的日志。
2.2 broker如何进行迁移
2.2.1 入口
broker的controller节点负责partiton的迁移工作,在broker被选为controller节点的时候会watch "/admin/reassign_partitions" 节点的变化。
private def registerReassignedPartitionsListener() = {
zkUtils.zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
}
所以迁移的工作主要在partitionReassignedListener中,controller watch到"/admin/reassign_partitions"节点数据变化后,会读取该数据内容,并跳过正在删除的partiton,进行迁移工作。
class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging {
this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: "
val zkUtils = controller.controllerContext.zkUtils
val controllerContext = controller.controllerContext
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
val partitionsReassignmentData = zkUtils.parsePartitionReassignmentData(data.toString) // 读取"/admin/reassign_partitions"节点内的数据,封装成[TopicAndPartition, relipcs] 的形式
val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
}
partitionsToBeReassigned.foreach { partitionToBeReassigned => // 迁移每一个partiton
inLock(controllerContext.controllerLock) {
if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) { // 正在删除的则跳过
error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
.format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
} else {
val context = new ReassignedPartitionsContext(partitionToBeReassigned._2) // 将目标replica列表封装成ReassignedPartitionsContext
controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context) // 重点,以上都是读取,这里才是真正的迁移工作
}
}
}
}
2.2.2 KafkaController#initiateReassignReplicasForTopicPartition()
initiateReassignReplicasForTopicPartition进行迁移工作。但是他主要做一些校验工作,该方法中会watch该partiton的ISR变化情况,即监听“/brokers/topics/{topic}/partitions/{partiton}/state” 节点的变化, 这和迁移的原理有关系。
def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
reassignedPartitionContext: ReassignedPartitionsContext) {
val newReplicas = reassignedPartitionContext.newReplicas
val topic = topicAndPartition.topic
val partition = topicAndPartition.partition
val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) // 根据broker的存活进行过滤
try {
// 重点, 从controllerContext中读取partition的AR
val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
assignedReplicasOpt match {
case Some(assignedReplicas) =>
// 如果ontrollerContext中AR和目标迁移列表相同,则抛异常。注意他们都是Seq类型,相同是指顺序也相同。
if(assignedReplicas == newReplicas) {
throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
" %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
} else {
if(aliveNewReplicas == newReplicas) { // 目标列表里的replic的broker都存活才能进行迁移
watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext) // 重点,后面会分析
controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
onPartitionReassignment(topicAndPartition, reassignedPartitionContext) // 重点,真正干活的,做迁移工作的
} else { // 有不存活的,则抛出异常。
// some replica in RAR is not alive. Fail partition reassignment
throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) +
" %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) +
"Failing partition reassignment")
}
}
case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
.format(topicAndPartition))
}
} catch {
case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
// remove the partition from the admin path to unblock the admin client
removePartitionFromReassignedPartitions(topicAndPartition) // 重点,一点迁移出问题,抛出异常,则会将"/admin/reassign_partitions"里的相应信息清空或者删除节点
}
}
2.2.3 KafkaController#onPartitionReassignment
真正的迁移步骤是在onPartitionReassignment完成的
def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match { // 判断目标迁移的replic是不是都在ISR中,都在的意思是目标迁移的replic列表是ISR的一个子集。例如ISR列表是[1,2,3],目标迁移列表是[2,3],则为true。
// 如果是false,则会将AR和目标replic列表做个并集,类似于增加该partiton的副本数。例如AR是[1,2,3], 目标是[2,3,4],则会将partiton的AR设置为[1, 2, 3, 4], 相当于partiton增加了一个新的replic。
case false =>
val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet // AR和目标replica列表求并集
// 更新AR到zk和缓存
updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
// 发送LeaderAndIsr
updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
newAndOldReplicas.toSeq)
// 让新增加的replic上线,使其开始从leader同步数据。
startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
"reassigned to catch up with the leader")
// 如果是true,则将不再目标列表中的AR中的replic去掉,例如目标迁移是[2,3],AR是[1,2,3], 则将1下线
case true =>
//4. Wait until all replicas in RAR are in sync with the leader.
val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
//5. replicas in RAR -> OnlineReplica
reassignedReplicas.foreach { replica =>
replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
replica)), OnlineReplica)
}
// 目标列表中没有leader则需要重新选下leader
moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
// 以下是停用掉下掉的replica的一些工作,例如更新AR,更新zk,发送meta请求等。另外删除"/admin/reassign_partitions"节点数据
stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
removePartitionFromReassignedPartitions(topicAndPartition)
info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
}
}
可以到这里会有个疑问,目标迁移列表不是ISR的子集,就只是增加了replic,并没有去掉replic的步骤啊。这里的关键是在initiateReassignReplicasForTopicPartition中watch了partiton的ISR情况,即调用了watchIsrChangesForReassignedPartition方法。
2.2.4 watchIsrChangesForReassignedPartition
该方法监听/brokers/topics/{topic}/partitions/{partiton}/state” 节点的变化。如果目标迁移列表已经跟上leader了,那么就会将不在目标迁移列表里的replic下线,完成迁移
def handleDataChange(dataPath: String, data: Object) {
inLock(controllerContext.controllerLock) {
debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data))
val topicAndPartition = TopicAndPartition(topic, partition)
try {
controllerContext.partitionsBeingReassigned.get(topicAndPartition) match {
case Some(reassignedPartitionContext) =>
val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topic, partition)
newLeaderAndIsrOpt match {
case Some(leaderAndIsr) => // check if new replicas have joined ISR
val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet // 求并集ISR和目标迁移列表的并集
if(caughtUpReplicas == reassignedReplicas) { // 目标迁移列表全部跟上了,则再次调用KafkaController#onPartitionReassignment,这次会走true那个判断分支了,会将不再目标replic列表中的replic下线。
// resume the partition reassignment process
info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
.format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
"Resuming partition reassignment")
controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
}
else {
info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
.format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
"Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
}
case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
.format(topicAndPartition, reassignedReplicas.mkString(",")))
}
case None =>
}
} catch {
case e: Throwable => error("Error while handling partition reassignment", e)
}
}
}
3 broker 处理迁移的思路总结
从以上分析我们可以看出,broker会watch "/admin/reassign_partitions"节点。当发现有迁移任务的时候,会将partiton的AR进行扩展,例如原先partiton的AR是[1, 2], 现在要迁移到[2, 3],那么partiton会先将AR扩展到[1, 2, 3],并监控ISR的变化。
当replica-2和replica-3都跟上后,即在ISR中的时候,表明新的repica-3已经和leader数据同步了。这个时候就可以将replica-1剔除了,最后得到迁移结果是[2, 3]。即迁移是一个先增加再减少的过程。
4 可能遇到的问题
4.1 报错
Assigned replicas (0,1) don't match the list of replicas for reassignment (1,0) for partition [testTopic,0] Reassignment of partition [testTopic,0] failed
在2.1.2中已经说明了,该错误是由于"/admin/reassign_partitions"节点已经被删除了,但是AR和目标迁移列表不相同报的错,一般需要看下controller的日志,看下controller在迁移过程中是不是抛出了异常。
4.2 迁移一直在进行中,不能完成
迁移需要等目标迁移列表中的replic都跟上了leader才能完成,目前迁移列表一直跟不上,那么就不会完成。可以看下zk中“/brokers/topics/{topic}/partitions/{partiton}/state”,注意下目标迁移列表是不是在isr中,如果不在说明要迁移的replic还没有完成从leader拉取数据。具体为甚么没有拉取成功,可能是数据量比较大,拉取需要一定的时间;也可能是其他原因比如集群宕机了等,需要具体分析下