zoukankan      html  css  js  c++  java
  • isr 管理

    每一个副本 leader 管理自己的 isr 列表,进行扩缩,并写入 zk

    把某些 replica 加入到 isr 中:如果 follower 的 offset 大于等于 leader 的 HW,则把该 replica 加入到 isr 中

    // kafka.cluster.Partition#maybeExpandIsr
      def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = {
        inWriteLock(leaderIsrUpdateLock) {
          // check if this replica needs to be added to the ISR
          leaderReplicaIfLocal match {
            case Some(leaderReplica) =>
              val replica = getReplica(replicaId).get
              val leaderHW = leaderReplica.highWatermark
              val fetchOffset = logReadResult.info.fetchOffsetMetadata.messageOffset
              if (!inSyncReplicas.contains(replica) &&
                 assignedReplicas.map(_.brokerId).contains(replicaId) &&
                 replica.logEndOffset.offsetDiff(leaderHW) >= 0 &&
                 leaderEpochStartOffsetOpt.exists(fetchOffset >= _)) {
                val newInSyncReplicas = inSyncReplicas + replica
                info(s"Expanding ISR from ${inSyncReplicas.map(_.brokerId).mkString(",")} " +
                  s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}")
                // update ISR in ZK and cache
                updateIsr(newInSyncReplicas)
                replicaManager.isrExpandRate.mark()
              }
              // check if the HW of the partition can now be incremented
              // since the replica may already be in the ISR and its LEO has just incremented
              maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs)
            case None => false // nothing to do if no longer leader
          }
        }
      }

    把某些 replica 从 isr 中剔除,如果 follower 的 lastCaughtUpTimeMs 距离当前时间超过设定的阈值

    // kafka.cluster.Partition#maybeShrinkIsr 
      def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
        val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
          leaderReplicaIfLocal match {
            case Some(leaderReplica) =>
              val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
              if(outOfSyncReplicas.nonEmpty) {
                val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
                assert(newInSyncReplicas.nonEmpty)
                info("Shrinking ISR from %s to %s".format(inSyncReplicas.map(_.brokerId).mkString(","),
                  newInSyncReplicas.map(_.brokerId).mkString(",")))
                // update ISR in zk and in cache
                updateIsr(newInSyncReplicas)
                // we may need to increment high watermark since ISR could be down to 1
    
                replicaManager.isrShrinkRate.mark()
                maybeIncrementLeaderHW(leaderReplica)
              } else {
                false
              }
    
            case None => false // do nothing if no longer leader
          }
        }
    
        // some delayed operations may be unblocked after HW changed
        if (leaderHWIncremented)
          tryCompleteDelayedRequests()
      }
      
    // kafka.cluster.Partition#getOutOfSyncReplicas
      def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = {
        /**
         * there are two cases that will be handled here -
         * 1. Stuck followers: If the leo of the replica hasn't been updated for maxLagMs ms,
         *                     the follower is stuck and should be removed from the ISR
         * 2. Slow followers: If the replica has not read up to the leo within the last maxLagMs ms,
         *                    then the follower is lagging and should be removed from the ISR
         * Both these cases are handled by checking the lastCaughtUpTimeMs which represents
         * the last time when the replica was fully caught up. If either of the above conditions
         * is violated, that replica is considered to be out of sync
         *
         **/
        val candidateReplicas = inSyncReplicas - leaderReplica
        // 当前时间 - lastCaughtUpTimeMs > replica.lag.time.max.ms
        val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
        if (laggingReplicas.nonEmpty)
          debug("Lagging replicas are %s".format(laggingReplicas.map(_.brokerId).mkString(",")))
    
        laggingReplicas
      }

    重点是这个 _lastCaughtUpTimeMs 的设置,leader 会保留前一次 follower fetch 时的 leader leo 和 fetch 时间戳

      def updateLogReadResult(logReadResult: LogReadResult) {
        // follower 的 offset 等于 leader 的 LEO
        if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset)
          _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, logReadResult.fetchTimeMs)
        // follower 的 offset 大于等于上一次 fetch 的 leader 的 LEO
        else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset)
          _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs)
    
        logStartOffset = logReadResult.followerLogStartOffset
        logEndOffset = logReadResult.info.fetchOffsetMetadata
        lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset
        lastFetchTimeMs = logReadResult.fetchTimeMs
      }

    maybeExpandIsr 在 follower fetch 数据时触发,而 maybeShrinkIsr 是由定时任务触发

    // kafka.server.ReplicaManager#startup
      def startup() {
        // start ISR expiration thread
        // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR
        scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
        scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS)
        scheduler.schedule("shutdown-idle-replica-alter-log-dirs-thread", shutdownIdleReplicaAlterLogDirsThread _, period = 10000L, unit = TimeUnit.MILLISECONDS)
    
        // If inter-broker protocol (IBP) < 1.0, the controller will send LeaderAndIsrRequest V0 which does not include isNew field.
        // In this case, the broker receiving the request cannot determine whether it is safe to create a partition if a log directory has failed.
        // Thus, we choose to halt the broker on any log diretory failure if IBP < 1.0
        val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_1_0_IV0
        logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure)
        logDirFailureHandler.start()
      }
  • 相关阅读:
    ETL之Kettle
    java 之webmagic 网络爬虫
    【AC自动机】【树状数组】【dfs序】洛谷 P2414 [NOI2011]阿狸的打字机 题解
    【AC自动机】【字符串】【字典树】AC自动机 学习笔记
    【前缀和】【two-pointer】【贪心】洛谷 P3143 [USACO16OPEN]钻石收藏家Diamond Collector 题解
    【KMP】【矩阵加速】【递推】洛谷 P3193 [HNOI2008]GT考试 题解
    【KMP】洛谷P2375 [NOI2014]动物园 题解
    【KMP】【字符串】KMP字符串匹配算法 学习笔记
    【DP】+【贪心】【前缀和】洛谷P2893 [USACO08FEB]修路Making the Grade 题解
    【字典树】【树】【二进制】bzoj1954/POJ3764The xor-longest Path 题解
  • 原文地址:https://www.cnblogs.com/allenwas3/p/13149594.html
Copyright © 2011-2022 走看看