zoukankan      html  css  js  c++  java
  • 【原创】大数据基础之Kafka(2)reassign过程

    reassign过程

    • 选择每个partition的各个replica分布到哪个broker
      • 每个partition的第一个replica随机选择一个broker
      • 除第一个replica之外其他的replicas会加一个随机shift之后顺序选择n-1个broker
    • 选择这个broker中多个log_dir中的哪个来存放
      • 如果这个broker之前存在该partition的replica,则直接使用之前的目录
      • 否则随机选择一个目录

    该过程并不能保证数据在各个broker的各个磁盘上平均分配,必要时需要人工干预reassign计划才能更好的均衡,尤其是在各个partition大小极度不均衡的情况下

    详细代码过程

    kafka-reassign-partitions.sh

    exec $(dirname $0)/kafka-run-class.sh kafka.admin.ReassignPartitionsCommand "$@"
    

    kafka.admin.ReassignPartitionsCommand

    main方法中根据opt调用对应的方法,generateAssignment用来生成计划,executeAssignment用来执行计划

      def main(args: Array[String]): Unit = {
        val opts = validateAndParseArgs(args)
        val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
        val time = Time.SYSTEM
        val zkClient = KafkaZkClient(zkConnect, JaasUtils.isZkSecurityEnabled, 30000, 30000, Int.MaxValue, time)
    
        val adminClientOpt = createAdminClient(opts)
    
        try {
          if(opts.options.has(opts.verifyOpt))
            verifyAssignment(zkClient, adminClientOpt, opts)
          else if(opts.options.has(opts.generateOpt))
            generateAssignment(zkClient, opts)
          else if (opts.options.has(opts.executeOpt))
            executeAssignment(zkClient, adminClientOpt, opts)
        } catch {
          case e: Throwable =>
            println("Partitions reassignment failed due to " + e.getMessage)
            println(Utils.stackTrace(e))
        } finally zkClient.close()
      }
    

    其中generateAssignment方法如下

      def generateAssignment(zkClient: KafkaZkClient, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicPartition, Seq[Int]], Map[TopicPartition, Seq[Int]]) = {
        val topicsToReassign = parseTopicsData(topicsToMoveJsonString)
        val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign)
        if (duplicateTopicsToReassign.nonEmpty)
          throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(",")))
        val currentAssignment = zkClient.getReplicaAssignmentForTopics(topicsToReassign.toSet)
    
        val groupedByTopic = currentAssignment.groupBy { case (tp, _) => tp.topic }
        val rackAwareMode = if (disableRackAware) RackAwareMode.Disabled else RackAwareMode.Enforced
        val adminZkClient = new AdminZkClient(zkClient)
        val brokerMetadatas = adminZkClient.getBrokerMetadatas(rackAwareMode, Some(brokerListToReassign))
    
        val partitionsToBeReassigned = mutable.Map[TopicPartition, Seq[Int]]()
        groupedByTopic.foreach { case (topic, assignment) =>
          val (_, replicas) = assignment.head
          val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size)
          partitionsToBeReassigned ++= assignedReplicas.map { case (partition, replicas) =>
            new TopicPartition(topic, partition) -> replicas
          }
        }
        (partitionsToBeReassigned, currentAssignment)
      }
    

    主要是调用AdminUtils.assignReplicasToBrokers

      /**
       * There are 3 goals of replica assignment:
       *
       * 1. Spread the replicas evenly among brokers.
       * 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
       * 3. If all brokers have rack information, assign the replicas for each partition to different racks if possible
       *
       * To achieve this goal for replica assignment without considering racks, we:
       * 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
       * 2. Assign the remaining replicas of each partition with an increasing shift.
       *
       * Here is an example of assigning
       * broker-0  broker-1  broker-2  broker-3  broker-4
       * p0        p1        p2        p3        p4       (1st replica)
       * p5        p6        p7        p8        p9       (1st replica)
       * p4        p0        p1        p2        p3       (2nd replica)
       * p8        p9        p5        p6        p7       (2nd replica)
       * p3        p4        p0        p1        p2       (3nd replica)
       * p7        p8        p9        p5        p6       (3nd replica)
    ...
      def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],
                                  nPartitions: Int,
                                  replicationFactor: Int,
                                  fixedStartIndex: Int = -1,
                                  startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
        if (nPartitions <= 0)
          throw new InvalidPartitionsException("Number of partitions must be larger than 0.")
        if (replicationFactor <= 0)
          throw new InvalidReplicationFactorException("Replication factor must be larger than 0.")
        if (replicationFactor > brokerMetadatas.size)
          throw new InvalidReplicationFactorException(s"Replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}.")
        if (brokerMetadatas.forall(_.rack.isEmpty))
          assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
            startPartitionId)
        else {
          if (brokerMetadatas.exists(_.rack.isEmpty))
            throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.")
          assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,
            startPartitionId)
        }
      }
    

    会根据是否有rack信息来走不通的方法,大部分场景用的是assignReplicasToBrokersRackUnaware

      private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
                                                     replicationFactor: Int,
                                                     brokerList: Seq[Int],
                                                     fixedStartIndex: Int,
                                                     startPartitionId: Int): Map[Int, Seq[Int]] = {
        val ret = mutable.Map[Int, Seq[Int]]()
        val brokerArray = brokerList.toArray
        val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
        var currentPartitionId = math.max(0, startPartitionId)
        var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
        for (_ <- 0 until nPartitions) {
          if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
            nextReplicaShift += 1
          val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
          val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
          for (j <- 0 until replicationFactor - 1)
            replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
          ret.put(currentPartitionId, replicaBuffer)
          currentPartitionId += 1
        }
        ret
      }
    

    可以看到分配逻辑是:

    • 第一个replica在所有的broker中随机选择一个
    • 除第一个replica之外的其他replicas会做一个随机shift后连续选择n-1个broker来存放

    下面再看executeAssignment

      def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L) {
        val (partitionAssignment, replicaAssignment) = parseAndValidate(zkClient, reassignmentJsonString)
        val adminZkClient = new AdminZkClient(zkClient)
        val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, adminClientOpt, partitionAssignment.toMap, replicaAssignment, adminZkClient)
    
        // If there is an existing rebalance running, attempt to change its throttle
        if (zkClient.reassignPartitionsInProgress()) {
          println("There is an existing assignment running.")
          reassignPartitionsCommand.maybeLimit(throttle)
        } else {
          printCurrentAssignment(zkClient, partitionAssignment.map(_._1.topic))
          if (throttle.interBrokerLimit >= 0 || throttle.replicaAlterLogDirsLimit >= 0)
            println(String.format("Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value."))
          if (reassignPartitionsCommand.reassignPartitions(throttle, timeoutMs)) {
            println("Successfully started reassignment of partitions.")
          } else
            println("Failed to reassign partitions %s".format(partitionAssignment))
        }
      }
    

    主要调用的是reassignPartitions方法

      def reassignPartitions(throttle: Throttle = NoThrottle, timeoutMs: Long = 10000L): Boolean = {
        maybeThrottle(throttle)
        try {
          val validPartitions = proposedPartitionAssignment.filter { case (p, _) => validatePartition(zkClient, p.topic, p.partition) }
          if (validPartitions.isEmpty) false
          else {
            if (proposedReplicaAssignment.nonEmpty && adminClientOpt.isEmpty)
              throw new AdminCommandFailedException("bootstrap-server needs to be provided in order to reassign replica to the specified log directory")
            val startTimeMs = System.currentTimeMillis()
    
            // Send AlterReplicaLogDirsRequest to allow broker to create replica in the right log dir later if the replica has not been created yet.
            if (proposedReplicaAssignment.nonEmpty)
              alterReplicaLogDirsIgnoreReplicaNotAvailable(proposedReplicaAssignment, adminClientOpt.get, timeoutMs)
    
            // Create reassignment znode so that controller will send LeaderAndIsrRequest to create replica in the broker
            zkClient.createPartitionReassignment(validPartitions.map({case (key, value) => (new TopicPartition(key.topic, key.partition), value)}).toMap)
    
            // Send AlterReplicaLogDirsRequest again to make sure broker will start to move replica to the specified log directory.
            // It may take some time for controller to create replica in the broker. Retry if the replica has not been created.
            var remainingTimeMs = startTimeMs + timeoutMs - System.currentTimeMillis()
            val replicasAssignedToFutureDir = mutable.Set.empty[TopicPartitionReplica]
            while (remainingTimeMs > 0 && replicasAssignedToFutureDir.size < proposedReplicaAssignment.size) {
              replicasAssignedToFutureDir ++= alterReplicaLogDirsIgnoreReplicaNotAvailable(
                proposedReplicaAssignment.filterKeys(replica => !replicasAssignedToFutureDir.contains(replica)), adminClientOpt.get, remainingTimeMs)
              Thread.sleep(100)
              remainingTimeMs = startTimeMs + timeoutMs - System.currentTimeMillis()
            }
            replicasAssignedToFutureDir.size == proposedReplicaAssignment.size
          }
        } catch {
          case _: NodeExistsException =>
            val partitionsBeingReassigned = zkClient.getPartitionReassignment
            throw new AdminCommandFailedException("Partition reassignment currently in " +
              "progress for %s. Aborting operation".format(partitionsBeingReassigned))
        }
      }
    

    主要是调用zkClient.createPartitionReassignment方法在zk上创建znode:/kafka/admin/reassign_partitions,这个节点的值就是reassign计划信息,控制逻辑到此就断了,剩下的要看controller的部分

    kafka.controller.KafkaController

      private def initializeControllerContext() {
        // update controller cache with delete topic information
        controllerContext.liveBrokers = zkClient.getAllBrokersInCluster.toSet
        controllerContext.allTopics = zkClient.getAllTopicsInCluster.toSet
        registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
        zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet).foreach {
          case (topicPartition, assignedReplicas) => controllerContext.updatePartitionReplicaAssignment(topicPartition, assignedReplicas)
        }
        controllerContext.partitionLeadershipInfo.clear()
        controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
        // register broker modifications handlers
        registerBrokerModificationsHandler(controllerContext.liveBrokers.map(_.id))
        // update the leader and isr cache for all existing partitions from Zookeeper
        updateLeaderAndIsrCache()
        // start the channel manager
        startChannelManager()
        initializePartitionReassignment()
        info(s"Currently active brokers in the cluster: ${controllerContext.liveBrokerIds}")
        info(s"Currently shutting brokers in the cluster: ${controllerContext.shuttingDownBrokerIds}")
        info(s"Current list of topics in the cluster: ${controllerContext.allTopics}")
      }
    

    controller初始化的时候调用initializeControllerContext,其中会调用initializePartitionReassignment

      private def initializePartitionReassignment() {
        // read the partitions being reassigned from zookeeper path /admin/reassign_partitions
        val partitionsBeingReassigned = zkClient.getPartitionReassignment
        info(s"Partitions being reassigned: $partitionsBeingReassigned")
    
        controllerContext.partitionsBeingReassigned ++= partitionsBeingReassigned.iterator.map { case (tp, newReplicas) =>
          val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(this, eventManager, tp)
          tp -> new ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler)
        }
      }
    

    其中会注册zk节点的watcher,具体逻辑在PartitionReassignmentIsrChange

      case class PartitionReassignmentIsrChange(partition: TopicPartition) extends ControllerEvent {
        override def state: ControllerState = ControllerState.PartitionReassignment
    
        override def process(): Unit = {
          if (!isActive) return
          // check if this partition is still being reassigned or not
          controllerContext.partitionsBeingReassigned.get(partition).foreach { reassignedPartitionContext =>
            val reassignedReplicas = reassignedPartitionContext.newReplicas.toSet
            zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match {
              case Some(leaderIsrAndControllerEpoch) => // check if new replicas have joined ISR
                val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
                val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
                if (caughtUpReplicas == reassignedReplicas) {
                  // resume the partition reassignment process
                  info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " +
                    s"partition $partition being reassigned. Resuming partition reassignment")
                  onPartitionReassignment(partition, reassignedPartitionContext)
                }
                else {
                  info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " +
                    s"partition $partition being reassigned. Replica(s) " +
                    s"${(reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")} still need to catch up")
                }
              case None => error(s"Error handling reassignment of partition $partition to replicas " +
                             s"${reassignedReplicas.mkString(",")} as it was never created")
            }
          }
        }
      }
    

    正常情况要调用到onPartitionReassignment才可以做reassign,前提条件是reassign计划的节点都在isr里,除此之外还有一个回调也会触发reassign,即onBrokerStartup

      private def onBrokerStartup(newBrokers: Seq[Int]) {
        info(s"New broker startup callback for ${newBrokers.mkString(",")}")
        newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
        val newBrokersSet = newBrokers.toSet
        // send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new
        // broker via this update.
        // In cases of controlled shutdown leaders will not be elected when a new broker comes up. So at least in the
        // common controlled shutdown case, the metadata will reach the new brokers faster
        sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
        // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
        // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
        val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
        replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers.toSeq, OnlineReplica)
        // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions
        // to see if these brokers can become leaders for some/all of those
        partitionStateMachine.triggerOnlinePartitionStateChange()
        // check if reassignment of some partitions need to be restarted
        val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter {
          case (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains)
        }
        partitionsWithReplicasOnNewBrokers.foreach { case (tp, context) => onPartitionReassignment(tp, context) }
        // check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists
        // on the newly restarted brokers, there is a chance that topic deletion can resume
        val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
        if (replicasForTopicsToBeDeleted.nonEmpty) {
          info(s"Some replicas ${replicasForTopicsToBeDeleted.mkString(",")} for topics scheduled for deletion " +
            s"${topicDeletionManager.topicsToBeDeleted.mkString(",")} are on the newly restarted brokers " +
            s"${newBrokers.mkString(",")}. Signaling restart of topic deletion for these topics")
          topicDeletionManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
        }
        registerBrokerModificationsHandler(newBrokers)
      }
    

    下面看onPartitionReassignment的细节

      private def onPartitionReassignment(topicPartition: TopicPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
        val reassignedReplicas = reassignedPartitionContext.newReplicas
        if (!areReplicasInIsr(topicPartition, reassignedReplicas)) {
          info(s"New replicas ${reassignedReplicas.mkString(",")} for partition $topicPartition being reassigned not yet " +
            "caught up with the leader")
          val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicPartition).toSet
          val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicPartition)).toSet
          //1. Update AR in ZK with OAR + RAR.
          updateAssignedReplicasForPartition(topicPartition, newAndOldReplicas.toSeq)
          //2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).
          updateLeaderEpochAndSendRequest(topicPartition, controllerContext.partitionReplicaAssignment(topicPartition),
            newAndOldReplicas.toSeq)
          //3. replicas in RAR - OAR -> NewReplica
          startNewReplicasForReassignedPartition(topicPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
          info(s"Waiting for new replicas ${reassignedReplicas.mkString(",")} for partition ${topicPartition} being " +
            "reassigned to catch up with the leader")
        }
    

    主要看前半部分,最重要的是第2步会发送LeaderAndIsr请求给所有的计划中的replica

    最后看broker如何处理LeaderAndIsr请求

    kafka.server.ReplicaManager

      def becomeLeaderOrFollower(correlationId: Int,
                                 leaderAndIsrRequest: LeaderAndIsrRequest,
                                 onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
        leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
          stateChangeLogger.trace(s"Received LeaderAndIsr request $stateInfo " +
            s"correlation id $correlationId from controller ${leaderAndIsrRequest.controllerId} " +
            s"epoch ${leaderAndIsrRequest.controllerEpoch} for partition $topicPartition")
        }
        replicaStateChangeLock synchronized {
          if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
            stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller ${leaderAndIsrRequest.controllerId} with " +
              s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " +
              s"Latest known controller epoch is $controllerEpoch")
            leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)
          } else {
            val responseMap = new mutable.HashMap[TopicPartition, Errors]
            val controllerId = leaderAndIsrRequest.controllerId
            controllerEpoch = leaderAndIsrRequest.controllerEpoch
    
            // First check partition's leader epoch
            val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]()
            val newPartitions = leaderAndIsrRequest.partitionStates.asScala.keys.filter(topicPartition => getPartition(topicPartition).isEmpty)
    
            leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
              val partition = getOrCreatePartition(topicPartition)
              val partitionLeaderEpoch = partition.getLeaderEpoch
              if (partition eq ReplicaManager.OfflinePartition) {
                stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
                  s"controller $controllerId with correlation id $correlationId " +
                  s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
                  "partition is in an offline log directory")
                responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
              } else if (partitionLeaderEpoch < stateInfo.basePartitionState.leaderEpoch) {
                // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
                // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
                if(stateInfo.basePartitionState.replicas.contains(localBrokerId))
                  partitionState.put(partition, stateInfo)
                else {
                  stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
                    s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
                    s"in assigned replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}")
                  responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
                }
              } else {
                // Otherwise record the error code in response
                stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
                  s"controller $controllerId with correlation id $correlationId " +
                  s"epoch $controllerEpoch for partition $topicPartition since its associated " +
                  s"leader epoch ${stateInfo.basePartitionState.leaderEpoch} is not higher than the current " +
                  s"leader epoch $partitionLeaderEpoch")
                responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
              }
            }
    
            val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) =>
              stateInfo.basePartitionState.leader == localBrokerId
            }
            val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
    
            val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
              makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
            else
              Set.empty[Partition]
            val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
              makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap)
            else
              Set.empty[Partition]
    
            leaderAndIsrRequest.partitionStates.asScala.keys.foreach(topicPartition =>
              /*
               * If there is offline log directory, a Partition object may have been created by getOrCreatePartition()
               * before getOrCreateReplica() failed to create local replica due to KafkaStorageException.
               * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
               * we need to map this topic-partition to OfflinePartition instead.
               */
              if (getReplica(topicPartition).isEmpty && (allPartitions.get(topicPartition) ne ReplicaManager.OfflinePartition))
                allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
            )
    
            // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
            // have been completely populated before starting the checkpointing there by avoiding weird race conditions
            if (!hwThreadInitialized) {
              startHighWaterMarksCheckPointThread()
              hwThreadInitialized = true
            }
    
            val newOnlineReplicas = newPartitions.flatMap(topicPartition => getReplica(topicPartition))
            // Add future replica to partition's map
            val futureReplicasAndInitialOffset = newOnlineReplicas.filter { replica =>
              logManager.getLog(replica.topicPartition, isFuture = true).isDefined
            }.map { replica =>
              replica.topicPartition -> BrokerAndInitialOffset(BrokerEndPoint(config.brokerId, "localhost", -1), replica.highWatermark.messageOffset)
            }.toMap
            futureReplicasAndInitialOffset.keys.foreach(tp => getPartition(tp).get.getOrCreateReplica(Request.FutureLocalReplicaId))
    
            // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move replica from source dir to destination dir
            futureReplicasAndInitialOffset.keys.foreach(logManager.abortAndPauseCleaning)
            replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)
    
            replicaFetcherManager.shutdownIdleFetcherThreads()
            replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
            onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
            new LeaderAndIsrResponse(Errors.NONE, responseMap.asJava)
          }
        }
      }
    

    处理过程如下:首先尝试创建partition,然后检查partition状态,如果一切正常,会开始同步数据并加入partition的isr

    创建本地log的位置在

    val newOnlineReplicas = newPartitions.flatMap(topicPartition => getReplica(topicPartition))
    

    getReplica方法会调用到Partition.getReplica

      def getReplica(replicaId: Int = localBrokerId): Option[Replica] = Option(allReplicasMap.get(replicaId))
    
      def getReplicaOrException(replicaId: Int = localBrokerId): Replica =
        getReplica(replicaId).getOrElse(
          throw new ReplicaNotAvailableException(s"Replica $replicaId is not available for partition $topicPartition"))
    

    最终会调用到LogManager.getOrCreateLog

      def getOrCreateLog(topicPartition: TopicPartition, config: LogConfig, isNew: Boolean = false, isFuture: Boolean = false): Log = {
        logCreationOrDeletionLock synchronized {
          getLog(topicPartition, isFuture).getOrElse {
            // create the log if it has not already been created in another thread
            if (!isNew && offlineLogDirs.nonEmpty)
              throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline")
    
            val logDir = {
              val preferredLogDir = preferredLogDirs.get(topicPartition)
    
              if (isFuture) {
                if (preferredLogDir == null)
                  throw new IllegalStateException(s"Can not create the future log for $topicPartition without having a preferred log directory")
                else if (getLog(topicPartition).get.dir.getParent == preferredLogDir)
                  throw new IllegalStateException(s"Can not create the future log for $topicPartition in the current log directory of this partition")
              }
    
              if (preferredLogDir != null)
                preferredLogDir
              else
                nextLogDir().getAbsolutePath
            }
            if (!isLogDirOnline(logDir))
              throw new KafkaStorageException(s"Can not create log for $topicPartition because log directory $logDir is offline")
    
            try {
              val dir = {
                if (isFuture)
                  new File(logDir, Log.logFutureDirName(topicPartition))
                else
                  new File(logDir, Log.logDirName(topicPartition))
              }
              Files.createDirectories(dir.toPath)
    ...
    

    只看前半部分即可,创建log的时候会受限从preferredLogDirs找,如果没有就随机找一个可用的目录,preferredLogDirs的初始化过程为

      def maybeUpdatePreferredLogDir(topicPartition: TopicPartition, logDir: String): Unit = {
        // Do not cache the preferred log directory if either the current log or the future log for this partition exists in the specified logDir
        if (!getLog(topicPartition).exists(_.dir.getParent == logDir) &&
            !getLog(topicPartition, isFuture = true).exists(_.dir.getParent == logDir))
          preferredLogDirs.put(topicPartition, logDir)
      }
    

    把当前磁盘的数据分布情况放到preferredLogDirs


    ---------------------------------------------------------------- 结束啦,我是大魔王先生的分割线 :) ----------------------------------------------------------------
    • 由于大魔王先生能力有限,文中可能存在错误,欢迎指正、补充!
    • 感谢您的阅读,如果文章对您有用,那么请为大魔王先生轻轻点个赞,ありがとう
  • 相关阅读:
    数据量你造吗-JAVA分页
    编写高质量代码改善java程序的151个建议——[1-3]基础?亦是基础
    概率论快速学习03:概率公理补充
    概率论快速学习02:概率公理
    项目ITP(六) spring4.0 整合 Quartz 实现动态任务调度
    项目ITP(五) spring4.0 整合 Quartz 实现任务调度
    编写高质量代码改善java程序的151个建议——导航开篇
    概率论快速学习01:计数
    改善JAVA代码01:考虑静态工厂方法代替构造器
    Python快速学习10: 循环的对象及设计 (生活的规律)
  • 原文地址:https://www.cnblogs.com/barneywill/p/15025404.html
Copyright © 2011-2022 走看看