zoukankan      html  css  js  c++  java
  • kafka partiton迁移方法与原理

    在kafka中增加新的节点后,数据是不会自动迁移到新的节点上的,需要我们手动将数据迁移(或者成为打散)到新的节点上

    1 迁移方法

    kafka为我们提供了用于数据迁移的脚本。我们可以用这些脚本完成数据的迁移。

    1.1 生成partiton分配表

    1.1.1 创建json文件topic-to-move.json

    {
      "topics": [{"topic": "testTopic"}],
      "version": 1
    }
    

    1.1.2 生成partiton分配表

    运行

    $ ./kafka-reassign-partitions --zookeeper ${zk_address} --topics-to-move-json-file  topic-to-move.json --broker-list "140,141" --generate
    

    其中${zk_address}是kafka所连接zk地址,"140,141"为该topic将要迁移到的目标节点。

    生成结果如下:

    Current partition replica assignment
    
    {"version":1,"partitions":[{"topic":"testTopic","partition":1,"replicas":[61,62]},{"topic":"testTopic","partition":0,"replicas":[62,61]}]}
    
    Proposed partition reassignment configuration
    
    {"version":1,"partitions":[{"topic":"testTopic","partition":1,"replicas":[140,141]},{"topic":"testTopic","partition":0,"replicas":[141,140]}]}
    

    其中上半部分是当前的partiton分布情况,下半部分是迁移成功后的partion分布情况。那么我们就用下部分的json进行迁移。另外,也可以自己构造个类似的json文件,同样可以进行迁移。这里我们使用脚本为我们生成的json文件,将下半部分的json保存为expand-cluster-reassignment.json

    1.2 执行迁移

    $ ./kafka-reassign-partitions --zookeeper ${zk_address}  --reassignment-json-file expand-cluster-reassignment.json --execute
    
    

    1.3 查看迁移进度

    $ ./kafka-reassign-partitions --zookeeper ${zk_address} --reassignment-json-file expand-cluster-reassignment.json --verify
    
    

    2 源码分析

    2.1 脚本调用

    kafka-reassign-partitions.sh会调用kafka.admin.ReassignPartitionsCommand.scala,在代码运行过程中抛出的任何异常都会通过标准输出打印出来,所以如果执行该脚本报错,可以看下这块代码来定位问题。

    def main(args: Array[String]): Unit = {
        // 略
        try {
          if(opts.options.has(opts.verifyOpt)) // 校验
            verifyAssignment(zkUtils, opts)
          else if(opts.options.has(opts.generateOpt)) // 生成json
            generateAssignment(zkUtils, opts)
          else if (opts.options.has(opts.executeOpt)) // 执行迁移
            executeAssignment(zkUtils, opts)
        } catch {
          case e: Throwable =>
            println("Partitions reassignment failed due to " + e.getMessage)
            println(Utils.stackTrace(e))
        } finally {
          val zkClient = zkUtils.zkClient
          if (zkClient != null)
            zkClient.close()
        }
    

    2.1.1 executeAssignment

    executeAssignment 用于执行迁移。

      def executeAssignment(zkUtils: ZkUtils,reassignmentJsonString: String){
        // 略,做一些校验和去重等工作
    
        // 获取当前的partition分布情况
        zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic))
        println("Current partition replica assignment
    
    %s
    
    Save this to use as the --reassignment-json-file option during rollback"
          .format(zkUtils.formatAsReassignmentJson(currentPartitionReplicaAssignment)))
    
          // 重点,执行迁移,z即将json写到zk上,准确的说是写到"/admin/reassign_partitions"下
        // start the reassignment
        if(reassignPartitionsCommand.reassignPartitions())
          println("Successfully started reassignment of partitions %s".format(zkUtils.formatAsReassignmentJson(partitionsToBeReassigned.toMap)))
        else
          println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
      }
    

    executeAssignment将json写到zk上后,brokerwatch到节点数据变化就开始进行迁移了

    2.1.2 verifyAssignment

    verifyAssignment用于校验迁移进度

    def verifyAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
        // 略
        println("Status of partition reassignment:")
        val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkUtils, partitionsToBeReassigned) // 重点
        reassignedPartitionsStatus.foreach { partition =>
          partition._2 match {
            case ReassignmentCompleted =>
              println("Reassignment of partition %s completed successfully".format(partition._1))
            case ReassignmentFailed =>
              println("Reassignment of partition %s failed".format(partition._1))
            case ReassignmentInProgress =>
              println("Reassignment of partition %s is still in progress".format(partition._1))
          }
        }
      }
    
      private def checkIfReassignmentSucceeded(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
      :Map[TopicAndPartition, ReassignmentStatus] = {
        val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas) // 从zk节点"/admin/reassign_partitions"读取迁移信息
        partitionsToBeReassigned.map { topicAndPartition =>
          (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkUtils,topicAndPartition._1,
            topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned))
        }
      }
    
      def checkIfPartitionReassignmentSucceeded(zkUtils: ZkUtils, topicAndPartition: TopicAndPartition,
                                                reassignedReplicas: Seq[Int],
                                                partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]],
                                                partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = {
        val newReplicas = partitionsToBeReassigned(topicAndPartition)
        partitionsBeingReassigned.get(topicAndPartition) match {
          case Some(partition) => ReassignmentInProgress // 如果tp对应的数据存在则说明还在迁移
          case None =>  // 否则可能是成功了
            // check if the current replica assignment matches the expected one after reassignment
            val assignedReplicas = zkUtils.getReplicasForPartition(topicAndPartition.topic, topicAndPartition.partition)
            if(assignedReplicas == newReplicas) // 重点,如果节点不存在了,但是迁移后的replica列表和预期不一致,则报错
              ReassignmentCompleted
            else { // 经常遇到的报错
              println(("ERROR: Assigned replicas (%s) don't match the list of replicas for reassignment (%s)" +
                " for partition %s").format(assignedReplicas.mkString(","), newReplicas.mkString(","), topicAndPartition))
              ReassignmentFailed
            }
        }
      }
    

    从源码中可以看出判断迁移是否完成是根据"/admin/reassign_partitions"是否存在来判断。如果节点不存在了,并且迁移后的AR和预期一致,则才算成功。

    注意:在实际迁移中遇到过好几次报错类似如下,即上面代码的打印的日志

    don't match the list of replicas for reassignment

    从代码中可以看到出现这个错误的原因是"/admin/reassign_partitions"不存在了,但是当前topic的AR和预期的不一致。这个原因一般是由于迁移的时候broker那边报错了,然后将节点删除了,并没有进行迁移。具体原因需要看下broker的controller的日志。

    2.2 broker如何进行迁移

    2.2.1 入口

    broker的controller节点负责partiton的迁移工作,在broker被选为controller节点的时候会watch "/admin/reassign_partitions" 节点的变化。

    private def registerReassignedPartitionsListener() = {
        zkUtils.zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
      }
    

    所以迁移的工作主要在partitionReassignedListener中,controller watch到"/admin/reassign_partitions"节点数据变化后,会读取该数据内容,并跳过正在删除的partiton,进行迁移工作。

    class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging {
      this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: "
      val zkUtils = controller.controllerContext.zkUtils
      val controllerContext = controller.controllerContext
    
      @throws(classOf[Exception])
      def handleDataChange(dataPath: String, data: Object) {
        val partitionsReassignmentData = zkUtils.parsePartitionReassignmentData(data.toString) // 读取"/admin/reassign_partitions"节点内的数据,封装成[TopicAndPartition, relipcs] 的形式
        val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
          partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
        }
        partitionsToBeReassigned.foreach { partitionToBeReassigned => // 迁移每一个partiton
          inLock(controllerContext.controllerLock) {
            if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) { // 正在删除的则跳过
              error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
                .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
              controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
            } else {
              val context = new ReassignedPartitionsContext(partitionToBeReassigned._2) // 将目标replica列表封装成ReassignedPartitionsContext
              controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context) // 重点,以上都是读取,这里才是真正的迁移工作
            }
          }
        }
      }
    

    2.2.2 KafkaController#initiateReassignReplicasForTopicPartition()

    initiateReassignReplicasForTopicPartition进行迁移工作。但是他主要做一些校验工作,该方法中会watch该partiton的ISR变化情况,即监听“/brokers/topics/{topic}/partitions/{partiton}/state” 节点的变化, 这和迁移的原理有关系。

    def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
                                            reassignedPartitionContext: ReassignedPartitionsContext) {
        val newReplicas = reassignedPartitionContext.newReplicas
        val topic = topicAndPartition.topic
        val partition = topicAndPartition.partition
        val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) // 根据broker的存活进行过滤
        try {
            // 重点, 从controllerContext中读取partition的AR
          val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
          assignedReplicasOpt match {
            case Some(assignedReplicas) =>
            // 如果ontrollerContext中AR和目标迁移列表相同,则抛异常。注意他们都是Seq类型,相同是指顺序也相同。
              if(assignedReplicas == newReplicas) {
                throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
                  " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
              } else {
                if(aliveNewReplicas == newReplicas) { // 目标列表里的replic的broker都存活才能进行迁移
            
                  watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext) // 重点,后面会分析
                  controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
                
                  deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
                  onPartitionReassignment(topicAndPartition, reassignedPartitionContext) // 重点,真正干活的,做迁移工作的
                } else { // 有不存活的,则抛出异常。
                  // some replica in RAR is not alive. Fail partition reassignment
                  throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) +
                    " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) +
                    "Failing partition reassignment")
                }
              }
            case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
              .format(topicAndPartition))
          }
        } catch {
          case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
          // remove the partition from the admin path to unblock the admin client
          removePartitionFromReassignedPartitions(topicAndPartition) // 重点,一点迁移出问题,抛出异常,则会将"/admin/reassign_partitions"里的相应信息清空或者删除节点
        }
      }
    

    2.2.3 KafkaController#onPartitionReassignment

    真正的迁移步骤是在onPartitionReassignment完成的

    def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
        val reassignedReplicas = reassignedPartitionContext.newReplicas
        areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match { // 判断目标迁移的replic是不是都在ISR中,都在的意思是目标迁移的replic列表是ISR的一个子集。例如ISR列表是[1,2,3],目标迁移列表是[2,3],则为true。
    
        // 如果是false,则会将AR和目标replic列表做个并集,类似于增加该partiton的副本数。例如AR是[1,2,3], 目标是[2,3,4],则会将partiton的AR设置为[1, 2, 3, 4], 相当于partiton增加了一个新的replic。
          case false =>
            val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
            val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet // AR和目标replica列表求并集
            // 更新AR到zk和缓存
            updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
            // 发送LeaderAndIsr
            updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
              newAndOldReplicas.toSeq)
            // 让新增加的replic上线,使其开始从leader同步数据。
            startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
            info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
              "reassigned to catch up with the leader")
    
              // 如果是true,则将不再目标列表中的AR中的replic去掉,例如目标迁移是[2,3],AR是[1,2,3], 则将1下线
          case true =>
            //4. Wait until all replicas in RAR are in sync with the leader.
            val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
            //5. replicas in RAR -> OnlineReplica
            reassignedReplicas.foreach { replica =>
              replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
                replica)), OnlineReplica)
            }
            
            // 目标列表中没有leader则需要重新选下leader
            moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
           
           // 以下是停用掉下掉的replica的一些工作,例如更新AR,更新zk,发送meta请求等。另外删除"/admin/reassign_partitions"节点数据
            stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
            
            updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
    
            removePartitionFromReassignedPartitions(topicAndPartition)
            info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
            controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
    
            sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
        
            deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
        }
      }
    

    可以到这里会有个疑问,目标迁移列表不是ISR的子集,就只是增加了replic,并没有去掉replic的步骤啊。这里的关键是在initiateReassignReplicasForTopicPartition中watch了partiton的ISR情况,即调用了watchIsrChangesForReassignedPartition方法。

    2.2.4 watchIsrChangesForReassignedPartition

    该方法监听/brokers/topics/{topic}/partitions/{partiton}/state” 节点的变化。如果目标迁移列表已经跟上leader了,那么就会将不在目标迁移列表里的replic下线,完成迁移

    def handleDataChange(dataPath: String, data: Object) {
        inLock(controllerContext.controllerLock) {
          debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data))
          val topicAndPartition = TopicAndPartition(topic, partition)
          try {
            controllerContext.partitionsBeingReassigned.get(topicAndPartition) match {
              case Some(reassignedPartitionContext) =>
                val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topic, partition)
                newLeaderAndIsrOpt match {
                  case Some(leaderAndIsr) => // check if new replicas have joined ISR
                    val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet // 求并集ISR和目标迁移列表的并集
                    if(caughtUpReplicas == reassignedReplicas) { // 目标迁移列表全部跟上了,则再次调用KafkaController#onPartitionReassignment,这次会走true那个判断分支了,会将不再目标replic列表中的replic下线。
                      // resume the partition reassignment process
                      info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
                        .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
                        "Resuming partition reassignment")
                      controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
                    }
                    else {
                      info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
                        .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
                        "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
                    }
                  case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
                    .format(topicAndPartition, reassignedReplicas.mkString(",")))
                }
              case None =>
            }
          } catch {
            case e: Throwable => error("Error while handling partition reassignment", e)
          }
        }
      }
    

    3 broker 处理迁移的思路总结

    从以上分析我们可以看出,broker会watch "/admin/reassign_partitions"节点。当发现有迁移任务的时候,会将partiton的AR进行扩展,例如原先partiton的AR是[1, 2], 现在要迁移到[2, 3],那么partiton会先将AR扩展到[1, 2, 3],并监控ISR的变化。

    当replica-2和replica-3都跟上后,即在ISR中的时候,表明新的repica-3已经和leader数据同步了。这个时候就可以将replica-1剔除了,最后得到迁移结果是[2, 3]。即迁移是一个先增加再减少的过程。

    4 可能遇到的问题

    4.1 报错

    Assigned replicas (0,1) don't match the list of replicas for reassignment (1,0) for partition [testTopic,0] Reassignment of partition [testTopic,0] failed 
    

    在2.1.2中已经说明了,该错误是由于"/admin/reassign_partitions"节点已经被删除了,但是AR和目标迁移列表不相同报的错,一般需要看下controller的日志,看下controller在迁移过程中是不是抛出了异常。

    4.2 迁移一直在进行中,不能完成

    迁移需要等目标迁移列表中的replic都跟上了leader才能完成,目前迁移列表一直跟不上,那么就不会完成。可以看下zk中“/brokers/topics/{topic}/partitions/{partiton}/state”,注意下目标迁移列表是不是在isr中,如果不在说明要迁移的replic还没有完成从leader拉取数据。具体为甚么没有拉取成功,可能是数据量比较大,拉取需要一定的时间;也可能是其他原因比如集群宕机了等,需要具体分析下

  • 相关阅读:
    Laravel + Vue 之 OPTIONS 请求的处理
    Vue2.0 keep-alive 组件的最佳实践
    Vue.js 登录注册实现
    数仓项目04:环境搭建(MysqlHA+Hive)
    CentOS7_JDK安装和环境变量配置
    改环境变量改出问题了,vi,ls这些命令都不能用了,怎么办
    CentOS7配置网络
    curl命令下载jdk
    CentOS7设置IP地址
    ODPS
  • 原文地址:https://www.cnblogs.com/set-cookie/p/9614241.html
Copyright © 2011-2022 走看看