zoukankan      html  css  js  c++  java
  • Kafka 0.8 Controller设计机制和状态变化

    在kafka集群中,其中一个broker server作为中央控制器Control,负责管理分区和副本状态并执行管理着这些分区的重新分配。

    下面说明如何通过中央控制器操作分区和副本的状态。

    名词解释

    • isr:同步副本组
    • OfflinePartitionLeaderSelector:分区下线后新的领导者选举
    • OAR:老的分配副本
    • PartitionStateChange: 分区状态

    1 PartitionStateChange

    1.1 其有效状态如下:

    • NonExistentPartition: 这种状态表明该分区从来没有创建过或曾经创建过后来又删除了。
    • NewPartition:创建分区后,分区处于NewPartition状态。在这种状态下,还没有Leader/ISR组。
    • OnlinePartition:一旦一个分区Leader被选出,就会处于该状态。
    • OfflinePartition:如果分区Leader成功选举后,当Leader分区崩溃或挂了,分区状态转变为该状态。

    1.2 其有效的状态转移如下:

    • NonExistentPartition -> NewPartition : 集群Controller根据计算规则,从zk中读取分区信息,创建新partition和replica。(?什么样的计算规则?
      1. 首先将第一个可用的副本broker作为leader broker并把所有可用的副本对象都装入ISR,然后写leader和ISR信息到zookeeper中保存
      2. 对于这个分区而言,发送LeaderAndIsr请求到每个可用的副本broker,以及UpdateMetadata请求到每个可用的broker上
    • NewPartition -> OnlinePartition :
      1. 分配第一个alive的副本作为分区leader,并且该分区所有replica作为一个同步复制组(ISR),将这些信息(Leader和ISR数据) 写到zk中。
      2. 对于这个分区,发送LeaderAndIsr请求给每一个replica分区和并发送UpdateMetadata请求到每个活者的broker server。目的是让所有的Broker都拥有全局的partition信息。
    • OnlinePartition,OfflinePartition -> OnlinePartition:
      1. 为该分区选取新的leader和ISR以及接收LeaderAndIsr请求的一组副本,然后写入leader和ISR信息到zookeeper中保存。。
        • OfflinePartitionLeaderSelector
        • ReassignedPartitionLeaderSelector
        • PreferredReplicaPartitionLeaderSelector
        • ControlledShutdownLeaderSelector
      2. 对于这个分区,发送LeaderAndIsr请求给每一个接收副本和UpdateMetadata请求到每个broker server
    • NewPartition,OnlinePartition -> OfflinePartition:表示分区下线状态
    • OfflinePartition -> NonExistentPartition: 表示分区不存在状态

    2 ReplicaStateChange

    2.1 有效状态如下:

    • NewReplica:当创建topic或分区重新分配期间,replica被创建。在这种状态下,副本只能成为Follower。
    • OnlineReplica:一旦此分区一个副本启动且部分分配副本,将处于在线副本状态。在这种状态下,它可以成为Leader或成为Follower
    • OfflineReplica:每当broker server副本宕机或崩溃发生时,如果一个副本崩溃或挂了,它将变为此状态。
    • NonExistentReplica:如果一个副本被删除了,它将变为此状态。

    2.2 有效状态转移如下:

    • NonExistentReplica - - > NewReplica: 使用当前Leader和isr分区发送LeaderAndIsr请求到新副本和UpdateMetadata请求给每一个存活borker
    • NewReplica - > OnlineReplica : 添加新的副本到副本列表中
    • OnlineReplica,OfflineReplica - > OnlineReplica: 使用当前领导者和isr分区发送LeaderAndIsr请求到新副本和UpdateMetadata请求给每一个存活borker
    • NewReplica,OnlineReplica - > OfflineReplica
      1. 发送StopReplicaRequest到相应副本(w / o删除)
      2. 从isr和发送LeaderAndIsr请求重删除此副本(isr)领导者副本和UpdateMetadata分区每个存活broker。
    • OfflineReplica - > NonExistentReplica : 发送StopReplicaRequest到副本(删除)

    3.KafkaController操作:

    3.1 当新建topic时,调用方法onNewPartitionCreation

    3.2 当创建新分区时:

    1. 创建新分区列表 -> 调用方法NewPartition
    2. 创建所有新分区副本 -> 调用方法NewReplica
    3. 新分区在线列表 -> 调用方法OnlinePartition
    4. 新分区所有在线副本 -> OnlineReplica

    3.3 当broker失败或挂掉时:

    1. 当前broker所有领导者分区为下线分区 -> 调用方法OfflinePartition
    2. 下线和在线分区列表 -> OnlinePartition (使用下线分区Leader election)
    3. 在broker上所有fail副本 -> OfflineReplica

    3.4 当broker启动时:

    1. 发送UpdateMetadate请求给新启动broker的所有分区。
    2. 新启动broker的分区副本-> OnlineReplica
    3. 下线和在线分区列表 -> OnlinePartition (使用下线分区领导者选举)
    4. 当新的broker启动时,对于所有分区副本,系统会调用方法onPartitionReassignment执行未完成的分区分配。

    3.5 当分区重新分配时: (OAR: 老的分配副本; RAR:每当重新分配副本会有新的副本组)

    1. 用OAR + RAR副本组修改并分配副本列表.
    2. 当处于OAR + RAR时,发送LeaderAndIsr请求给每个副本。
    3. 副本处于RAR - OAR -> 调用方法NewReplica
    4. 等待直到新的副本加入isr中
    5. 副本处于RAR -> 调用方法OnlineReplica
    6. 设置AR to RAR并写到内存中
    7. send LeaderAndIsr request 给一个潜在领导者 (如果当前领导者不在RAR中)和一个被分配的副本列表(使用RAR) 和相同sir到每个处于RAR的broker中。
    8. replicas in OAR - RAR -> Offline (强制这些副本从isr重剔除)
    9. replicas in OAR - RAR -> NonExistentReplica (强制这些副本被删除)
    10. 在zk上修改重分配副本到RAR中。
    11. 在zk上修改 /admin/reassign_partitions路径,并删除此分区
    12. 选举领导者后,副本和isr信息变化,所以重新发送更新元数据请求给每一个broker。

    3.6 当中央控制器failover时:

    • replicaStateMachine.startup():
      1. 从任何下线副本或上线副本中初始化每个副本
      2. 每个副本 -> OnlineReplica (强制LeaderAndIsr请求发送到每个副本)
    • partitionStateMachine.startup():
      1. 从新建分区中初始化每个分区, 下线或上线分区
      2. each OfflinePartition and NewPartition -> OnlinePartition (强制领导者选举)
    • 恢复分区分配
    • 恢复领导者选举

    3.7 当发送首选副本选举时:

    影响分区列表 -> 调用方法OnlinePartition (with PreferredReplicaPartitionLeaderSelector)

    3.8 关闭broker:

    1. 在关闭broker中对于每个分区如果是领导者分区 -> 调用方法OnlinePartition (ControlledShutdownPartitionLeaderSelector)
    2. 在关闭broker中每个副本是Follower,将发送StopReplica请求 (w/o deletion)
    3. 在关闭broker中每个副本是Follower -> 调用方法OfflineReplica (强制从同步副本组中删除副本)

    4 源码分析

    在KafkaController类中定义了很多属性,我们先重点了解下面的PartitionLeaderSelector对象,主要是为分区选举出leader broker,该trait只定义了一个方法selectLeader,接收一个TopicAndPartition对象和一个LeaderAndIsr对象。TopicAndPartition表示要选leader的分区,而第二个参数表示zookeeper中保存的该分区的当前leader和ISR记录。该方法会返回一个元组包括了推举出来的leader和ISR以及需要接收LeaderAndISr请求的一组副本。

    trait PartitionLeaderSelector {
      /**
       * @return The leader and isr request, with the newly selected leader and isr, and the set of replicas to receive
       * the LeaderAndIsrRequest.
       */
      def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
    }
    
    

    通过我们上面的代码,可以看到在KafkaController中共定义了五种selector选举器:

    1. NoOpLeaderSelector
    2. OfflinePartitionLeaderSelector
    3. ReassignedPartitionLeaderSelector
    4. PreferredReplicaPartitionLeaderSelector
    5. ControlledShutdownLeaderSelector

    4.1.ReassignedPartitionLeaderSelector

    从可用的ISR中选取第一个作为leader,把当前的ISR作为新的ISR,将重分配的副本集合作为接收LeaderAndIsr请求的副本集合。

    4.2 PreferredReplicaPartitionLeaderSelector

    如果从assignedReplicas取出的第一个副本就是分区leader的话,则抛出异常,否则将第一个副本设置为分区leader。

    4.3 ControlledShutdownLeaderSelector

    将ISR中处于关闭状态的副本从集合中去除掉,返回一个新新的ISR集合,然后选取第一个副本作为leader,然后令当前AR作为接收LeaderAndIsr请求的副本。

    4.4 NoOpLeaderSelector

    原则上不做任何事情,返回当前的leader和isr。

    4.5 OfflinePartitionLeaderSelector

    从活着的ISR中选择一个broker作为leader,如果ISR中没有活着的副本,则从assignedReplicas中选择一个副本作为leader,leader选举成功后注册到Zookeeper中,并更新所有的缓存。

    所有的leader选择完成后,都要通过请求把具体的request路由到对应的handler处理。目前kafka并没有把handler抽象出来,而是每个handler都是一个函数,混在KafkaApi类中

    /**
       * Top-level method that handles all requests and multiplexes to the right api
       */
      def handle(request: RequestChannel.Request) {
          trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
            format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
          ApiKeys.forId(request.requestId) match {
            case ApiKeys.PRODUCE => handleProducerRequest(request)
            case ApiKeys.FETCH => handleFetchRequest(request)
            case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
            case ApiKeys.METADATA => handleTopicMetadataRequest(request)
            case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
            case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
            case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
            case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
            case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
            case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
            case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
            case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
            case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
            case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
            case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
            case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
            case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
            case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
            case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
            case requestId => throw new KafkaException("Unknown api code " + requestId)
          }
        }
    

    4.6 case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)

    这个方法让其成为follower还是leader,得调用下面这个方法,
    kafka.server.ReplicaManager#becomeLeaderOrFollower:流程图如下。
    image

    def becomeLeaderOrFollower(correlationId: Int,leaderAndISRRequest: LeaderAndIsrRequest,
                                 metadataCache: MetadataCache,
                                 onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
        leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
          stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
                                    .format(localBrokerId, stateInfo, correlationId,
                                            leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition))
        }
        replicaStateChangeLock synchronized {
          val responseMap = new mutable.HashMap[TopicPartition, Short]
          if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
            leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
            stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
              "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
              correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
            }
            BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
          } else {
            val controllerId = leaderAndISRRequest.controllerId
            controllerEpoch = leaderAndISRRequest.controllerEpoch
    
            // First check partition's leader epoch
            val partitionState = new mutable.HashMap[Partition, PartitionState]()
            leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
              val partition = getOrCreatePartition(topicPartition.topic, topicPartition.partition)
              val partitionLeaderEpoch = partition.getLeaderEpoch()
              // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
              // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
              if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
                if(stateInfo.replicas.contains(config.brokerId))
                  partitionState.put(partition, stateInfo)
                else {
                  stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
                    "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
                    .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
                      topicPartition.topic, topicPartition.partition, stateInfo.replicas.asScala.mkString(",")))
                  responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
                }
              } else {
                // Otherwise record the error code in response
                stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
                  "epoch %d for partition [%s,%d] since its associated leader epoch %d is old. Current leader epoch is %d")
                  .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
                    topicPartition.topic, topicPartition.partition, stateInfo.leaderEpoch, partitionLeaderEpoch))
                responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code)
              }
            }
    
            val partitionsTobeLeader = partitionState.filter { case (partition, stateInfo) =>
              stateInfo.leader == config.brokerId
            }
            val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)
    
            val partitionsBecomeLeader = if (!partitionsTobeLeader.isEmpty)
              makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
            else
              Set.empty[Partition]
            val partitionsBecomeFollower = if (!partitionsToBeFollower.isEmpty)
              makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)
            else
              Set.empty[Partition]
    
            // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
            // have been completely populated before starting the checkpointing there by avoiding weird race conditions
            if (!hwThreadInitialized) {
              startHighWaterMarksCheckPointThread()
              hwThreadInitialized = true
            }
            replicaFetcherManager.shutdownIdleFetcherThreads()
    
            onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
            BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code)
          }
        }
      }
    
    代码解释:
    1. 如果请求中controllerEpoch小于当前最新的controllerEpoch,则直接返回ErrorMapping.StaleControllerEpochCode。

    2. 如果partitionStateInfo中的leader epoch大于当前ReplicManager中存储的(topic, partitionId)对应的partition的leader epoch,则:

      2.1 如果当前brokerid(或者说replica id)在partitionStateInfo中,则将该partition及partitionStateInfo存入一个名为partitionState的HashMap中。
      否则说明该Broker不在该Partition分配的Replica list中,将该信息记录于log中

    3. 如果partitionStateInfo中的leader epoch小于当前ReplicManager则将相应的Error code(ErrorMapping.StaleLeaderEpochCode)存入Response中。

    4. 筛选出partitionState中Leader与当前Broker ID相等的所有记录存入partitionsTobeLeader中,其它记录存入partitionsToBeFollower中。

      • 如果partitionsTobeLeader不为空,则对其执行makeLeaders方。
      • 如果partitionsToBeFollower不为空,则对其执行makeFollowers方法。
  • 相关阅读:
    mongodb入门安装与配置
    mssql export db
    初识django
    git
    水晶报表的使用经验和资料总结
    SQL中CONVERT转化函数的用法▲
    生活
    SQL中的临时表和表变量
    Convert Datetime to String in Sql Server
    转:探讨SQL Server 2005的安全策略
  • 原文地址:https://www.cnblogs.com/byrhuangqiang/p/6367849.html
Copyright © 2011-2022 走看看