zoukankan      html  css  js  c++  java
  • Apache Kafka源码分析

    startup

    在onControllerFailover中被调用,

    /**
       * Invoked on successful controller election. First registers a broker change listener since that triggers all
       * state transitions for replicas. Initializes the state of replicas for all partitions by reading from zookeeper.
       * Then triggers the OnlineReplica state change for all replicas.
       */
      def startup() {
        // initialize replica state
        initializeReplicaState()
        // set started flag
        hasStarted.set(true)
        // move all Online replicas to Online
        handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
    
        info("Started replica state machine with initial state -> " + replicaState.toString())
      }

     

    initializeReplicaState

    就是遍历所有topic,partition的replica,如果这个replica所在的broker活着,则将state置成online

    private def initializeReplicaState() {
        for((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {
          val topic = topicPartition.topic
          val partition = topicPartition.partition
          assignedReplicas.foreach { replicaId =>
            val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)
            controllerContext.liveBrokerIds.contains(replicaId) match {
              case true => replicaState.put(partitionAndReplica, OnlineReplica)
              case false =>  //不理解,为何不是offline
                // mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.
                // This is required during controller failover since during controller failover a broker can go down,
                // so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side.
                replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
            }
          }
        }
      }

     

    registerListeners

    在onControllerFailover中被调用,

    private def registerBrokerChangeListener() = {
        zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener) //"/brokers/ids"
      }

    想想这PartitionState中,listen的是Topic目录和Topic下面的partition目录,而并没有listen broker
    因为相对而言,Partitoin只不过是个逻辑概念,broker的状况不会导致partition的变化

    而对于ReplicaState,broker的状态会直接影响到replica的状态,所以这里需要listen broker

    /**
       * This is the zookeeper listener that triggers all the state transitions for a replica
       */
      class BrokerChangeListener() extends IZkChildListener with Logging {
      
        def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
          info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(",")))
          inLock(controllerContext.controllerLock) {
            if (hasStarted.get) {
              ControllerStats.leaderElectionTimer.time {
                try {
                  val curBrokerIds = currentBrokerList.map(_.toInt).toSet  //当前zk的brokers list
                  val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds //当前context里面没有的brokers,就是新的brokers
                  val newBrokerInfo = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)) //根据brokerid,从zk读出broker data,然后封装成Broker对象
                  val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get) //newBrokerInfo是Option对象,这里给出如何处理option对象,.isDefined判断是否为some,.get获取出真正的broker对象
                  val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds //context中有,但在zk中已经不存在的,就是dead brokers
                  controllerContext.liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) //更新context中的liveBrokes
                  info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
                    .format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(",")))
                  newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_)) //更新ChannelManager中的broker列表,因为ChannelManager负责往每个brokers发送request
                  deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))
                  if(newBrokerIds.size > 0)
                    controller.onBrokerStartup(newBrokerIds.toSeq)
                  if(deadBrokerIds.size > 0)
                    controller.onBrokerFailure(deadBrokerIds.toSeq)
                } catch {
                  case e: Throwable => error("Error while handling broker changes", e)
                }
              }
            }
          }
        }
      }

    当发现有新的brokers或者有brokers fail的情况下,调用controller的callback来处理

     

    KafkaController.onBrokerStartup

    def onBrokerStartup(newBrokers: Seq[Int]) {
        info("New broker startup callback for %s".format(newBrokers.mkString(",")))
        val newBrokersSet = newBrokers.toSet
        // send update metadata request for all partitions to the newly restarted brokers. In cases of controlled shutdown
        // leaders will not be elected when a new broker comes up. So at least in the common controlled shutdown case, the
        // metadata will reach the new brokers faster
        sendUpdateMetadataRequest(newBrokers) // 将controller的Metadata发送给新的brokers
    val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet) // 找出所有新brokers上的replicas replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica) // 将所有这些replica置为Online // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions // to see if these brokers can become leaders for some/all of those partitionStateMachine.triggerOnlinePartitionStateChange() // 尝试让new或offline的partition成为online // check if reassignment of some partitions need to be restarted val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter { case (topicAndPartition, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains(_)) } partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2)) // 处理reassignment // check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists // on the newly restarted brokers, there is a chance that topic deletion can resume val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic)) if(replicasForTopicsToBeDeleted.size > 0) { info(("Some replicas %s for topics scheduled for deletion %s are on the newly restarted brokers %s. " + "Signaling restart of topic deletion for these topics").format(replicasForTopicsToBeDeleted.mkString(","), deleteTopicManager.topicsToBeDeleted.mkString(","), newBrokers.mkString(","))) deleteTopicManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic)) } }

    其实主要做的事,就是将新brokers上的所有replicas置为online,并且trigger所有的new或offline的partition到online

     

    KafkaController.onBrokerFailure

    def onBrokerFailure(deadBrokers: Seq[Int]) {
        val deadBrokersSet = deadBrokers.toSet
        // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
        val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader => // 找出所有leader在deadBrokers上的partitions
          deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader) &&
            !deleteTopicManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet
        partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition) // 由于partition leader dead,所以状态变为offline
        // trigger OnlinePartition state changes for offline or new partitions
        partitionStateMachine.triggerOnlinePartitionStateChange() // 如果partition的replica不止一个,仍然可以从offline变成online,所以这里需要trigger
        // filter out the replicas that belong to topics that are being deleted
        var allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet)
        val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic)) // 找出所有deadBrokers上的replicas
        // handle dead replicas
        replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica) // 将说有deadBrokers上的replicas置为offline
        // check if topic deletion state for the dead replicas needs to be updated
        val replicasForTopicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
        if(replicasForTopicsToBeDeleted.size > 0) {
          // it is required to mark the respective replicas in TopicDeletionFailed state since the replica cannot be
          // deleted when the broker is down. This will prevent the replica from being in TopicDeletionStarted state indefinitely
          // since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted state
          deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted)
        }
      }

    也不复杂,
    1. 在dead brokers上的replica都设为offline
    2. 如果partition的leader在dead brokers上,那么先把partition设为offline,再用triggerOnlinePartitionStateChange试图去重新elect leader,转化成online

     

    handleStateChange

    def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState,
                            callbacks: Callbacks) {
        val topic = partitionAndReplica.topic
        val partition = partitionAndReplica.partition
        val replicaId = partitionAndReplica.replica
        val topicAndPartition = TopicAndPartition(topic, partition)
        val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
        try {
          val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
          targetState match {
            case NewReplica => // 新的Replica
              assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState) // 前一个状态只能是NonExistentReplica
              // start replica as a follower to the current leader for its partition
              val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) // 从zk取出该partition的leaderAndISR
              leaderIsrAndControllerEpochOpt match {
                case Some(leaderIsrAndControllerEpoch) =>
                  if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)  // 为何要做这个判断?后面叙述
                    throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
                      .format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
                  brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), // 更新newReplics所在broker的LeaderAndISR信息
                                                                      topic, partition, leaderIsrAndControllerEpoch,
                                                                      replicaAssignment)
                case None => // new leader request will be sent to this replica when one gets elected
              }
              replicaState.put(partitionAndReplica, NewReplica)
              stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                                        .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
                                                targetState))
            case ReplicaDeletionStarted =>  // 开始删除replica
              assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState) // 前一个状态只能是OfflineReplica
              replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
              // send stop replica command
              brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true, // 发送stopReplica Request,并且deletePartition为true,所有会真正删除replica
                callbacks.stopReplicaResponseCallback)
              stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
            case ReplicaDeletionIneligible => // 删除replica失败
              assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
              replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
              stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
            case ReplicaDeletionSuccessful => // 删除replica成功
              assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
              replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
              stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
            case NonExistentReplica => // 不存在replica
              assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState)
              // remove this replica from the assigned replicas list for its partition
              val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
              controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId)) // 从partitionReplicaAssignment中删除该replica
              replicaState.remove(partitionAndReplica) // 从replicaState删除该replica
              stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
            case OnlineReplica => // OnlineReplica
              assertValidPreviousStates(partitionAndReplica,
                List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState) // 可以看出,前一个状态有很多
              replicaState(partitionAndReplica) match {
                case NewReplica => // 对从newReplica转变到Online,需要将该replica加入AR
                  // add this replica to the assigned replicas list for its partition
                  val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
                  if(!currentAssignedReplicas.contains(replicaId))
                    controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
                  stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                                            .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
                                                    targetState))
                case _ =>
                  // check if the leader for this partition ever existed
                  controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
                    case Some(leaderIsrAndControllerEpoch) => // 将该partition的LeaderAndISR发送给这个replica所对应的broker
                      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
                        replicaAssignment)
                      replicaState.put(partitionAndReplica, OnlineReplica)
                      stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                        .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
                    case None => // that means the partition was never in OnlinePartition state, this means the broker never
                      // started a log for that partition and does not have a high watermark value for this partition
                  }
              }
              replicaState.put(partitionAndReplica, OnlineReplica)
            case OfflineReplica => // OfflineReplica
              assertValidPreviousStates(partitionAndReplica,
                List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
              // send stop replica command to the replica so that it stops fetching from the leader
              brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false) // Offline的replica也要发送stopReplica请求,但deletePartition为false,即不会删除该replica
              // As an optimization, the controller removes dead replicas from the ISR
              val leaderAndIsrIsEmpty: Boolean =
                controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
                  case Some(currLeaderIsrAndControllerEpoch) =>
                    controller.removeReplicaFromIsr(topic, partition, replicaId) match {  // 需要将这个replica从ISR里面去除
                      case Some(updatedLeaderIsrAndControllerEpoch) =>
                        // send the shrunk ISR state change request to all the remaining alive replicas of the partition.
                        val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
                        if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) {
                          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId), // 并要通过LeaderAndISR request通知其他brokers,ISR发生了变化
                            topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
                        }
                        replicaState.put(partitionAndReplica, OfflineReplica)
                        stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                          .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
                        false
                      case None =>
                        true
                    }
                  case None =>
                    true
                }
              if (leaderAndIsrIsEmpty)
                throw new StateChangeFailedException(
                  "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty"
                  .format(replicaId, topicAndPartition))
          }
        }
        catch {
          case t: Throwable =>
            stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] from %s to %s failed"
                                      .format(controllerId, controller.epoch, replicaId, topic, partition, currState, targetState), t)
        }
      }

    虽然状态很多,比partitionState复杂,其实大部分也只是单纯的做下state设置,需要特别注意的,

    1. 变成newReplica
    其实也是只是将该partition的leaderAndISR发送给replica所在的broker

    但做了个判断,如果该replica为leader的话会抛出异常,why?我的理解因为正常不应该存在这种case
    因为newReplica只有两种情况下会发生,一是,partition创建时,二是,partition reassignment时
    如果是partition刚创建,

    def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
        info("New partition creation callback for %s".format(newPartitions.mkString(",")))
        partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
        replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
        partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
        replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
      }

    可以看到,是先将replica置为new,再去把partition置online,所以在把replica设为new的时候,应该还没有leader

    如果partition是reassignment,那么当前是partition的leader一定不是新的这个replica

    2. 变成onlineReplica
    在创建partition或发现新的broker上线时,replica会变成online
    replica变成online没什么特别需要做的,
    当newReplica--》online时,需要把replica加入AR,这是在创建partition的时候做的
    当其他case --》online时,这个一般是在onBrokerStartup时做的,只需要把leaderAndISR发过去就ok

    为什么从newReplica --》online时,不需要发送leaderAndISR,我的理解因为在--》newReplica 时,就已经发过了,这就不用重复发了

    3. 变成offlineReplica
    在onBrokerFailure中被调用,
    对offlineReplica所在brokers,发送stopReplica request;将这些replicas从ISR中remove掉;通过leaderAndISR request通知其他的brokers;

  • 相关阅读:
    试图运行项目时出错,无法启动调试。没有正确安装调试器(转帖)
    IIS 401.2
    windows 2008 r2 64位运行crystal 2008的问题
    .net 命令行
    crystal report 2008 公式字段问题
    vs2003在IE8下无法调试的解决办法 (包括win2008 64位)
    Oninit里不能用ViewState
    web 开发的一些软件
    silverlight toolkit
    SqlConnection.Open的一些问题
  • 原文地址:https://www.cnblogs.com/fxjwind/p/4939476.html
Copyright © 2011-2022 走看看