在kafka集群中,其中一个broker server作为中央控制器Control,负责管理分区和副本状态并执行管理着这些分区的重新分配。
下面说明如何通过中央控制器操作分区和副本的状态。
名词解释
- isr:同步副本组
- OfflinePartitionLeaderSelector:分区下线后新的领导者选举
- OAR:老的分配副本
- PartitionStateChange: 分区状态
1 PartitionStateChange
1.1 其有效状态如下:
- NonExistentPartition: 这种状态表明该分区从来没有创建过或曾经创建过后来又删除了。
- NewPartition:创建分区后,分区处于NewPartition状态。在这种状态下,还没有Leader/ISR组。
- OnlinePartition:一旦一个分区Leader被选出,就会处于该状态。
- OfflinePartition:如果分区Leader成功选举后,当Leader分区崩溃或挂了,分区状态转变为该状态。
1.2 其有效的状态转移如下:
- NonExistentPartition -> NewPartition : 集群Controller根据计算规则,从zk中读取分区信息,创建新partition和replica。(?什么样的计算规则?)
- 首先将第一个可用的副本broker作为leader broker并把所有可用的副本对象都装入ISR,然后写leader和ISR信息到zookeeper中保存
- 对于这个分区而言,发送LeaderAndIsr请求到每个可用的副本broker,以及UpdateMetadata请求到每个可用的broker上
- NewPartition -> OnlinePartition :
- 分配第一个alive的副本作为分区leader,并且该分区所有replica作为一个同步复制组(ISR),将这些信息(Leader和ISR数据) 写到zk中。
- 对于这个分区,发送LeaderAndIsr请求给每一个replica分区和并发送UpdateMetadata请求到每个活者的broker server。目的是让所有的Broker都拥有全局的partition信息。
- OnlinePartition,OfflinePartition -> OnlinePartition:
- 为该分区选取新的leader和ISR以及接收LeaderAndIsr请求的一组副本,然后写入leader和ISR信息到zookeeper中保存。。
- OfflinePartitionLeaderSelector
- ReassignedPartitionLeaderSelector
- PreferredReplicaPartitionLeaderSelector
- ControlledShutdownLeaderSelector
- 对于这个分区,发送LeaderAndIsr请求给每一个接收副本和UpdateMetadata请求到每个broker server
- 为该分区选取新的leader和ISR以及接收LeaderAndIsr请求的一组副本,然后写入leader和ISR信息到zookeeper中保存。。
- NewPartition,OnlinePartition -> OfflinePartition:表示分区下线状态
- OfflinePartition -> NonExistentPartition: 表示分区不存在状态
2 ReplicaStateChange
2.1 有效状态如下:
- NewReplica:当创建topic或分区重新分配期间,replica被创建。在这种状态下,副本只能成为Follower。
- OnlineReplica:一旦此分区一个副本启动且部分分配副本,将处于在线副本状态。在这种状态下,它可以成为Leader或成为Follower
- OfflineReplica:每当broker server副本宕机或崩溃发生时,如果一个副本崩溃或挂了,它将变为此状态。
- NonExistentReplica:如果一个副本被删除了,它将变为此状态。
2.2 有效状态转移如下:
- NonExistentReplica - - > NewReplica: 使用当前Leader和isr分区发送LeaderAndIsr请求到新副本和UpdateMetadata请求给每一个存活borker
- NewReplica - > OnlineReplica : 添加新的副本到副本列表中
- OnlineReplica,OfflineReplica - > OnlineReplica: 使用当前领导者和isr分区发送LeaderAndIsr请求到新副本和UpdateMetadata请求给每一个存活borker
- NewReplica,OnlineReplica - > OfflineReplica
- 发送StopReplicaRequest到相应副本(w / o删除)
- 从isr和发送LeaderAndIsr请求重删除此副本(isr)领导者副本和UpdateMetadata分区每个存活broker。
- OfflineReplica - > NonExistentReplica : 发送StopReplicaRequest到副本(删除)
3.KafkaController操作:
3.1 当新建topic时,调用方法onNewPartitionCreation
3.2 当创建新分区时:
- 创建新分区列表 -> 调用方法NewPartition
- 创建所有新分区副本 -> 调用方法NewReplica
- 新分区在线列表 -> 调用方法OnlinePartition
- 新分区所有在线副本 -> OnlineReplica
3.3 当broker失败或挂掉时:
- 当前broker所有领导者分区为下线分区 -> 调用方法OfflinePartition
- 下线和在线分区列表 -> OnlinePartition (使用下线分区Leader election)
- 在broker上所有fail副本 -> OfflineReplica
3.4 当broker启动时:
- 发送UpdateMetadate请求给新启动broker的所有分区。
- 新启动broker的分区副本-> OnlineReplica
- 下线和在线分区列表 -> OnlinePartition (使用下线分区领导者选举)
- 当新的broker启动时,对于所有分区副本,系统会调用方法onPartitionReassignment执行未完成的分区分配。
3.5 当分区重新分配时: (OAR: 老的分配副本; RAR:每当重新分配副本会有新的副本组)
- 用OAR + RAR副本组修改并分配副本列表.
- 当处于OAR + RAR时,发送LeaderAndIsr请求给每个副本。
- 副本处于RAR - OAR -> 调用方法NewReplica
- 等待直到新的副本加入isr中
- 副本处于RAR -> 调用方法OnlineReplica
- 设置AR to RAR并写到内存中
- send LeaderAndIsr request 给一个潜在领导者 (如果当前领导者不在RAR中)和一个被分配的副本列表(使用RAR) 和相同sir到每个处于RAR的broker中。
- replicas in OAR - RAR -> Offline (强制这些副本从isr重剔除)
- replicas in OAR - RAR -> NonExistentReplica (强制这些副本被删除)
- 在zk上修改重分配副本到RAR中。
- 在zk上修改 /admin/reassign_partitions路径,并删除此分区
- 选举领导者后,副本和isr信息变化,所以重新发送更新元数据请求给每一个broker。
3.6 当中央控制器failover时:
- replicaStateMachine.startup():
- 从任何下线副本或上线副本中初始化每个副本
- 每个副本 -> OnlineReplica (强制LeaderAndIsr请求发送到每个副本)
- partitionStateMachine.startup():
- 从新建分区中初始化每个分区, 下线或上线分区
- each OfflinePartition and NewPartition -> OnlinePartition (强制领导者选举)
- 恢复分区分配
- 恢复领导者选举
3.7 当发送首选副本选举时:
影响分区列表 -> 调用方法OnlinePartition (with PreferredReplicaPartitionLeaderSelector)
3.8 关闭broker:
- 在关闭broker中对于每个分区如果是领导者分区 -> 调用方法OnlinePartition (ControlledShutdownPartitionLeaderSelector)
- 在关闭broker中每个副本是Follower,将发送StopReplica请求 (w/o deletion)
- 在关闭broker中每个副本是Follower -> 调用方法OfflineReplica (强制从同步副本组中删除副本)
4 源码分析
在KafkaController类中定义了很多属性,我们先重点了解下面的PartitionLeaderSelector对象,主要是为分区选举出leader broker,该trait只定义了一个方法selectLeader,接收一个TopicAndPartition对象和一个LeaderAndIsr对象。TopicAndPartition表示要选leader的分区,而第二个参数表示zookeeper中保存的该分区的当前leader和ISR记录。该方法会返回一个元组包括了推举出来的leader和ISR以及需要接收LeaderAndISr请求的一组副本。
trait PartitionLeaderSelector {
/**
* @return The leader and isr request, with the newly selected leader and isr, and the set of replicas to receive
* the LeaderAndIsrRequest.
*/
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
}
通过我们上面的代码,可以看到在KafkaController中共定义了五种selector选举器:
- NoOpLeaderSelector
- OfflinePartitionLeaderSelector
- ReassignedPartitionLeaderSelector
- PreferredReplicaPartitionLeaderSelector
- ControlledShutdownLeaderSelector
4.1.ReassignedPartitionLeaderSelector
从可用的ISR中选取第一个作为leader,把当前的ISR作为新的ISR,将重分配的副本集合作为接收LeaderAndIsr请求的副本集合。
4.2 PreferredReplicaPartitionLeaderSelector
如果从assignedReplicas取出的第一个副本就是分区leader的话,则抛出异常,否则将第一个副本设置为分区leader。
4.3 ControlledShutdownLeaderSelector
将ISR中处于关闭状态的副本从集合中去除掉,返回一个新新的ISR集合,然后选取第一个副本作为leader,然后令当前AR作为接收LeaderAndIsr请求的副本。
4.4 NoOpLeaderSelector
原则上不做任何事情,返回当前的leader和isr。
4.5 OfflinePartitionLeaderSelector
从活着的ISR中选择一个broker作为leader,如果ISR中没有活着的副本,则从assignedReplicas中选择一个副本作为leader,leader选举成功后注册到Zookeeper中,并更新所有的缓存。
所有的leader选择完成后,都要通过请求把具体的request路由到对应的handler处理。目前kafka并没有把handler抽象出来,而是每个handler都是一个函数,混在KafkaApi类中
/**
* Top-level method that handles all requests and multiplexes to the right api
*/
def handle(request: RequestChannel.Request) {
trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
ApiKeys.forId(request.requestId) match {
case ApiKeys.PRODUCE => handleProducerRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
}
4.6 case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
这个方法让其成为follower还是leader,得调用下面这个方法,
kafka.server.ReplicaManager#becomeLeaderOrFollower
:流程图如下。
def becomeLeaderOrFollower(correlationId: Int,leaderAndISRRequest: LeaderAndIsrRequest,
metadataCache: MetadataCache,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
.format(localBrokerId, stateInfo, correlationId,
leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition))
}
replicaStateChangeLock synchronized {
val responseMap = new mutable.HashMap[TopicPartition, Short]
if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
"its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
}
BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
} else {
val controllerId = leaderAndISRRequest.controllerId
controllerEpoch = leaderAndISRRequest.controllerEpoch
// First check partition's leader epoch
val partitionState = new mutable.HashMap[Partition, PartitionState]()
leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
val partition = getOrCreatePartition(topicPartition.topic, topicPartition.partition)
val partitionLeaderEpoch = partition.getLeaderEpoch()
// If the leader epoch is valid record the epoch of the controller that made the leadership decision.
// This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
if(stateInfo.replicas.contains(config.brokerId))
partitionState.put(partition, stateInfo)
else {
stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
"epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
.format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
topicPartition.topic, topicPartition.partition, stateInfo.replicas.asScala.mkString(",")))
responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
}
} else {
// Otherwise record the error code in response
stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
"epoch %d for partition [%s,%d] since its associated leader epoch %d is old. Current leader epoch is %d")
.format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
topicPartition.topic, topicPartition.partition, stateInfo.leaderEpoch, partitionLeaderEpoch))
responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code)
}
}
val partitionsTobeLeader = partitionState.filter { case (partition, stateInfo) =>
stateInfo.leader == config.brokerId
}
val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)
val partitionsBecomeLeader = if (!partitionsTobeLeader.isEmpty)
makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
else
Set.empty[Partition]
val partitionsBecomeFollower = if (!partitionsToBeFollower.isEmpty)
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)
else
Set.empty[Partition]
// we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
// have been completely populated before starting the checkpointing there by avoiding weird race conditions
if (!hwThreadInitialized) {
startHighWaterMarksCheckPointThread()
hwThreadInitialized = true
}
replicaFetcherManager.shutdownIdleFetcherThreads()
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code)
}
}
}
代码解释:
-
如果请求中controllerEpoch小于当前最新的controllerEpoch,则直接返回ErrorMapping.StaleControllerEpochCode。
-
如果partitionStateInfo中的leader epoch大于当前ReplicManager中存储的(topic, partitionId)对应的partition的leader epoch,则:
2.1 如果当前brokerid(或者说replica id)在partitionStateInfo中,则将该partition及partitionStateInfo存入一个名为partitionState的HashMap中。
否则说明该Broker不在该Partition分配的Replica list中,将该信息记录于log中 -
如果partitionStateInfo中的leader epoch小于当前ReplicManager则将相应的Error code(ErrorMapping.StaleLeaderEpochCode)存入Response中。
-
筛选出partitionState中Leader与当前Broker ID相等的所有记录存入partitionsTobeLeader中,其它记录存入partitionsToBeFollower中。
- 如果partitionsTobeLeader不为空,则对其执行makeLeaders方。
- 如果partitionsToBeFollower不为空,则对其执行makeFollowers方法。