每一个副本 leader 管理自己的 isr 列表,进行扩缩,并写入 zk
把某些 replica 加入到 isr 中:如果 follower 的 offset 大于等于 leader 的 HW,则把该 replica 加入到 isr 中
// kafka.cluster.Partition#maybeExpandIsr def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = { inWriteLock(leaderIsrUpdateLock) { // check if this replica needs to be added to the ISR leaderReplicaIfLocal match { case Some(leaderReplica) => val replica = getReplica(replicaId).get val leaderHW = leaderReplica.highWatermark val fetchOffset = logReadResult.info.fetchOffsetMetadata.messageOffset if (!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && replica.logEndOffset.offsetDiff(leaderHW) >= 0 && leaderEpochStartOffsetOpt.exists(fetchOffset >= _)) { val newInSyncReplicas = inSyncReplicas + replica info(s"Expanding ISR from ${inSyncReplicas.map(_.brokerId).mkString(",")} " + s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}") // update ISR in ZK and cache updateIsr(newInSyncReplicas) replicaManager.isrExpandRate.mark() } // check if the HW of the partition can now be incremented // since the replica may already be in the ISR and its LEO has just incremented maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs) case None => false // nothing to do if no longer leader } } }
把某些 replica 从 isr 中剔除,如果 follower 的 lastCaughtUpTimeMs 距离当前时间超过设定的阈值
// kafka.cluster.Partition#maybeShrinkIsr def maybeShrinkIsr(replicaMaxLagTimeMs: Long) { val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) { leaderReplicaIfLocal match { case Some(leaderReplica) => val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs) if(outOfSyncReplicas.nonEmpty) { val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas assert(newInSyncReplicas.nonEmpty) info("Shrinking ISR from %s to %s".format(inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in zk and in cache updateIsr(newInSyncReplicas) // we may need to increment high watermark since ISR could be down to 1 replicaManager.isrShrinkRate.mark() maybeIncrementLeaderHW(leaderReplica) } else { false } case None => false // do nothing if no longer leader } } // some delayed operations may be unblocked after HW changed if (leaderHWIncremented) tryCompleteDelayedRequests() } // kafka.cluster.Partition#getOutOfSyncReplicas def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = { /** * there are two cases that will be handled here - * 1. Stuck followers: If the leo of the replica hasn't been updated for maxLagMs ms, * the follower is stuck and should be removed from the ISR * 2. Slow followers: If the replica has not read up to the leo within the last maxLagMs ms, * then the follower is lagging and should be removed from the ISR * Both these cases are handled by checking the lastCaughtUpTimeMs which represents * the last time when the replica was fully caught up. If either of the above conditions * is violated, that replica is considered to be out of sync * **/ val candidateReplicas = inSyncReplicas - leaderReplica // 当前时间 - lastCaughtUpTimeMs > replica.lag.time.max.ms val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs) if (laggingReplicas.nonEmpty) debug("Lagging replicas are %s".format(laggingReplicas.map(_.brokerId).mkString(","))) laggingReplicas }
重点是这个 _lastCaughtUpTimeMs 的设置,leader 会保留前一次 follower fetch 时的 leader leo 和 fetch 时间戳
def updateLogReadResult(logReadResult: LogReadResult) { // follower 的 offset 等于 leader 的 LEO if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset) _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, logReadResult.fetchTimeMs) // follower 的 offset 大于等于上一次 fetch 的 leader 的 LEO else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset) _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs) logStartOffset = logReadResult.followerLogStartOffset logEndOffset = logReadResult.info.fetchOffsetMetadata lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset lastFetchTimeMs = logReadResult.fetchTimeMs }
maybeExpandIsr 在 follower fetch 数据时触发,而 maybeShrinkIsr 是由定时任务触发
// kafka.server.ReplicaManager#startup def startup() { // start ISR expiration thread // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS) scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS) scheduler.schedule("shutdown-idle-replica-alter-log-dirs-thread", shutdownIdleReplicaAlterLogDirsThread _, period = 10000L, unit = TimeUnit.MILLISECONDS) // If inter-broker protocol (IBP) < 1.0, the controller will send LeaderAndIsrRequest V0 which does not include isNew field. // In this case, the broker receiving the request cannot determine whether it is safe to create a partition if a log directory has failed. // Thus, we choose to halt the broker on any log diretory failure if IBP < 1.0 val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_1_0_IV0 logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure) logDirFailureHandler.start() }