zoukankan      html  css  js  c++  java
  • Kafka 0.8 Controller设计机制和状态变化

    在kafka集群中,其中一个broker server作为中央控制器Control,负责管理分区和副本状态并执行管理着这些分区的重新分配。

    下面说明如何通过中央控制器操作分区和副本的状态。

    名词解释

    • isr:同步副本组
    • OfflinePartitionLeaderSelector:分区下线后新的领导者选举
    • OAR:老的分配副本
    • PartitionStateChange: 分区状态

    1 PartitionStateChange

    1.1 其有效状态如下:

    • NonExistentPartition: 这种状态表明该分区从来没有创建过或曾经创建过后来又删除了。
    • NewPartition:创建分区后,分区处于NewPartition状态。在这种状态下,还没有Leader/ISR组。
    • OnlinePartition:一旦一个分区Leader被选出,就会处于该状态。
    • OfflinePartition:如果分区Leader成功选举后,当Leader分区崩溃或挂了,分区状态转变为该状态。

    1.2 其有效的状态转移如下:

    • NonExistentPartition -> NewPartition : 集群Controller根据计算规则,从zk中读取分区信息,创建新partition和replica。(?什么样的计算规则?
      1. 首先将第一个可用的副本broker作为leader broker并把所有可用的副本对象都装入ISR,然后写leader和ISR信息到zookeeper中保存
      2. 对于这个分区而言,发送LeaderAndIsr请求到每个可用的副本broker,以及UpdateMetadata请求到每个可用的broker上
    • NewPartition -> OnlinePartition :
      1. 分配第一个alive的副本作为分区leader,并且该分区所有replica作为一个同步复制组(ISR),将这些信息(Leader和ISR数据) 写到zk中。
      2. 对于这个分区,发送LeaderAndIsr请求给每一个replica分区和并发送UpdateMetadata请求到每个活者的broker server。目的是让所有的Broker都拥有全局的partition信息。
    • OnlinePartition,OfflinePartition -> OnlinePartition:
      1. 为该分区选取新的leader和ISR以及接收LeaderAndIsr请求的一组副本,然后写入leader和ISR信息到zookeeper中保存。。
        • OfflinePartitionLeaderSelector
        • ReassignedPartitionLeaderSelector
        • PreferredReplicaPartitionLeaderSelector
        • ControlledShutdownLeaderSelector
      2. 对于这个分区,发送LeaderAndIsr请求给每一个接收副本和UpdateMetadata请求到每个broker server
    • NewPartition,OnlinePartition -> OfflinePartition:表示分区下线状态
    • OfflinePartition -> NonExistentPartition: 表示分区不存在状态

    2 ReplicaStateChange

    2.1 有效状态如下:

    • NewReplica:当创建topic或分区重新分配期间,replica被创建。在这种状态下,副本只能成为Follower。
    • OnlineReplica:一旦此分区一个副本启动且部分分配副本,将处于在线副本状态。在这种状态下,它可以成为Leader或成为Follower
    • OfflineReplica:每当broker server副本宕机或崩溃发生时,如果一个副本崩溃或挂了,它将变为此状态。
    • NonExistentReplica:如果一个副本被删除了,它将变为此状态。

    2.2 有效状态转移如下:

    • NonExistentReplica - - > NewReplica: 使用当前Leader和isr分区发送LeaderAndIsr请求到新副本和UpdateMetadata请求给每一个存活borker
    • NewReplica - > OnlineReplica : 添加新的副本到副本列表中
    • OnlineReplica,OfflineReplica - > OnlineReplica: 使用当前领导者和isr分区发送LeaderAndIsr请求到新副本和UpdateMetadata请求给每一个存活borker
    • NewReplica,OnlineReplica - > OfflineReplica
      1. 发送StopReplicaRequest到相应副本(w / o删除)
      2. 从isr和发送LeaderAndIsr请求重删除此副本(isr)领导者副本和UpdateMetadata分区每个存活broker。
    • OfflineReplica - > NonExistentReplica : 发送StopReplicaRequest到副本(删除)

    3.KafkaController操作:

    3.1 当新建topic时,调用方法onNewPartitionCreation

    3.2 当创建新分区时:

    1. 创建新分区列表 -> 调用方法NewPartition
    2. 创建所有新分区副本 -> 调用方法NewReplica
    3. 新分区在线列表 -> 调用方法OnlinePartition
    4. 新分区所有在线副本 -> OnlineReplica

    3.3 当broker失败或挂掉时:

    1. 当前broker所有领导者分区为下线分区 -> 调用方法OfflinePartition
    2. 下线和在线分区列表 -> OnlinePartition (使用下线分区Leader election)
    3. 在broker上所有fail副本 -> OfflineReplica

    3.4 当broker启动时:

    1. 发送UpdateMetadate请求给新启动broker的所有分区。
    2. 新启动broker的分区副本-> OnlineReplica
    3. 下线和在线分区列表 -> OnlinePartition (使用下线分区领导者选举)
    4. 当新的broker启动时,对于所有分区副本,系统会调用方法onPartitionReassignment执行未完成的分区分配。

    3.5 当分区重新分配时: (OAR: 老的分配副本; RAR:每当重新分配副本会有新的副本组)

    1. 用OAR + RAR副本组修改并分配副本列表.
    2. 当处于OAR + RAR时,发送LeaderAndIsr请求给每个副本。
    3. 副本处于RAR - OAR -> 调用方法NewReplica
    4. 等待直到新的副本加入isr中
    5. 副本处于RAR -> 调用方法OnlineReplica
    6. 设置AR to RAR并写到内存中
    7. send LeaderAndIsr request 给一个潜在领导者 (如果当前领导者不在RAR中)和一个被分配的副本列表(使用RAR) 和相同sir到每个处于RAR的broker中。
    8. replicas in OAR - RAR -> Offline (强制这些副本从isr重剔除)
    9. replicas in OAR - RAR -> NonExistentReplica (强制这些副本被删除)
    10. 在zk上修改重分配副本到RAR中。
    11. 在zk上修改 /admin/reassign_partitions路径,并删除此分区
    12. 选举领导者后,副本和isr信息变化,所以重新发送更新元数据请求给每一个broker。

    3.6 当中央控制器failover时:

    • replicaStateMachine.startup():
      1. 从任何下线副本或上线副本中初始化每个副本
      2. 每个副本 -> OnlineReplica (强制LeaderAndIsr请求发送到每个副本)
    • partitionStateMachine.startup():
      1. 从新建分区中初始化每个分区, 下线或上线分区
      2. each OfflinePartition and NewPartition -> OnlinePartition (强制领导者选举)
    • 恢复分区分配
    • 恢复领导者选举

    3.7 当发送首选副本选举时:

    影响分区列表 -> 调用方法OnlinePartition (with PreferredReplicaPartitionLeaderSelector)

    3.8 关闭broker:

    1. 在关闭broker中对于每个分区如果是领导者分区 -> 调用方法OnlinePartition (ControlledShutdownPartitionLeaderSelector)
    2. 在关闭broker中每个副本是Follower,将发送StopReplica请求 (w/o deletion)
    3. 在关闭broker中每个副本是Follower -> 调用方法OfflineReplica (强制从同步副本组中删除副本)

    4 源码分析

    在KafkaController类中定义了很多属性,我们先重点了解下面的PartitionLeaderSelector对象,主要是为分区选举出leader broker,该trait只定义了一个方法selectLeader,接收一个TopicAndPartition对象和一个LeaderAndIsr对象。TopicAndPartition表示要选leader的分区,而第二个参数表示zookeeper中保存的该分区的当前leader和ISR记录。该方法会返回一个元组包括了推举出来的leader和ISR以及需要接收LeaderAndISr请求的一组副本。

    trait PartitionLeaderSelector {
      /**
       * @return The leader and isr request, with the newly selected leader and isr, and the set of replicas to receive
       * the LeaderAndIsrRequest.
       */
      def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
    }
    
    

    通过我们上面的代码,可以看到在KafkaController中共定义了五种selector选举器:

    1. NoOpLeaderSelector
    2. OfflinePartitionLeaderSelector
    3. ReassignedPartitionLeaderSelector
    4. PreferredReplicaPartitionLeaderSelector
    5. ControlledShutdownLeaderSelector

    4.1.ReassignedPartitionLeaderSelector

    从可用的ISR中选取第一个作为leader,把当前的ISR作为新的ISR,将重分配的副本集合作为接收LeaderAndIsr请求的副本集合。

    4.2 PreferredReplicaPartitionLeaderSelector

    如果从assignedReplicas取出的第一个副本就是分区leader的话,则抛出异常,否则将第一个副本设置为分区leader。

    4.3 ControlledShutdownLeaderSelector

    将ISR中处于关闭状态的副本从集合中去除掉,返回一个新新的ISR集合,然后选取第一个副本作为leader,然后令当前AR作为接收LeaderAndIsr请求的副本。

    4.4 NoOpLeaderSelector

    原则上不做任何事情,返回当前的leader和isr。

    4.5 OfflinePartitionLeaderSelector

    从活着的ISR中选择一个broker作为leader,如果ISR中没有活着的副本,则从assignedReplicas中选择一个副本作为leader,leader选举成功后注册到Zookeeper中,并更新所有的缓存。

    所有的leader选择完成后,都要通过请求把具体的request路由到对应的handler处理。目前kafka并没有把handler抽象出来,而是每个handler都是一个函数,混在KafkaApi类中

    /**
       * Top-level method that handles all requests and multiplexes to the right api
       */
      def handle(request: RequestChannel.Request) {
          trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
            format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
          ApiKeys.forId(request.requestId) match {
            case ApiKeys.PRODUCE => handleProducerRequest(request)
            case ApiKeys.FETCH => handleFetchRequest(request)
            case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
            case ApiKeys.METADATA => handleTopicMetadataRequest(request)
            case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
            case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
            case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
            case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
            case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
            case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
            case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
            case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
            case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
            case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
            case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
            case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
            case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
            case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
            case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
            case requestId => throw new KafkaException("Unknown api code " + requestId)
          }
        }
    

    4.6 case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)

    这个方法让其成为follower还是leader,得调用下面这个方法,
    kafka.server.ReplicaManager#becomeLeaderOrFollower:流程图如下。
    image

    def becomeLeaderOrFollower(correlationId: Int,leaderAndISRRequest: LeaderAndIsrRequest,
                                 metadataCache: MetadataCache,
                                 onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
        leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
          stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
                                    .format(localBrokerId, stateInfo, correlationId,
                                            leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition))
        }
        replicaStateChangeLock synchronized {
          val responseMap = new mutable.HashMap[TopicPartition, Short]
          if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
            leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
            stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
              "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
              correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
            }
            BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
          } else {
            val controllerId = leaderAndISRRequest.controllerId
            controllerEpoch = leaderAndISRRequest.controllerEpoch
    
            // First check partition's leader epoch
            val partitionState = new mutable.HashMap[Partition, PartitionState]()
            leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
              val partition = getOrCreatePartition(topicPartition.topic, topicPartition.partition)
              val partitionLeaderEpoch = partition.getLeaderEpoch()
              // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
              // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
              if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
                if(stateInfo.replicas.contains(config.brokerId))
                  partitionState.put(partition, stateInfo)
                else {
                  stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
                    "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
                    .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
                      topicPartition.topic, topicPartition.partition, stateInfo.replicas.asScala.mkString(",")))
                  responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
                }
              } else {
                // Otherwise record the error code in response
                stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
                  "epoch %d for partition [%s,%d] since its associated leader epoch %d is old. Current leader epoch is %d")
                  .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
                    topicPartition.topic, topicPartition.partition, stateInfo.leaderEpoch, partitionLeaderEpoch))
                responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code)
              }
            }
    
            val partitionsTobeLeader = partitionState.filter { case (partition, stateInfo) =>
              stateInfo.leader == config.brokerId
            }
            val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)
    
            val partitionsBecomeLeader = if (!partitionsTobeLeader.isEmpty)
              makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
            else
              Set.empty[Partition]
            val partitionsBecomeFollower = if (!partitionsToBeFollower.isEmpty)
              makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)
            else
              Set.empty[Partition]
    
            // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
            // have been completely populated before starting the checkpointing there by avoiding weird race conditions
            if (!hwThreadInitialized) {
              startHighWaterMarksCheckPointThread()
              hwThreadInitialized = true
            }
            replicaFetcherManager.shutdownIdleFetcherThreads()
    
            onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
            BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code)
          }
        }
      }
    
    代码解释:
    1. 如果请求中controllerEpoch小于当前最新的controllerEpoch,则直接返回ErrorMapping.StaleControllerEpochCode。

    2. 如果partitionStateInfo中的leader epoch大于当前ReplicManager中存储的(topic, partitionId)对应的partition的leader epoch,则:

      2.1 如果当前brokerid(或者说replica id)在partitionStateInfo中,则将该partition及partitionStateInfo存入一个名为partitionState的HashMap中。
      否则说明该Broker不在该Partition分配的Replica list中,将该信息记录于log中

    3. 如果partitionStateInfo中的leader epoch小于当前ReplicManager则将相应的Error code(ErrorMapping.StaleLeaderEpochCode)存入Response中。

    4. 筛选出partitionState中Leader与当前Broker ID相等的所有记录存入partitionsTobeLeader中,其它记录存入partitionsToBeFollower中。

      • 如果partitionsTobeLeader不为空,则对其执行makeLeaders方。
      • 如果partitionsToBeFollower不为空,则对其执行makeFollowers方法。
  • 相关阅读:
    Lucene.Net 2.3.1开发介绍 —— 二、分词(一)
    控制‘控制台应用程序’的关闭操作
    详解for循环(各种用法)
    敏捷软件开发
    Sql Server的一些知识点
    在SharePoint 2010 中配置Remote Blob Storage FILESTREAM Provider
    使用LotusScript操作Lotus Notes RTF域
    JOpt Simple 4.5 发布,命令行解析器
    John the Ripper 1.8.0 发布,密码破解工具
    PacketFence ZEN 4.0.1 发布,网络接入控制
  • 原文地址:https://www.cnblogs.com/byrhuangqiang/p/6367849.html
Copyright © 2011-2022 走看看