zoukankan      html  css  js  c++  java
  • 【Kafka源码】ReplicaManager启动过程

    在KafkaServer启动过程的入口中,会启动Replica Manager,众所周知,这是一个副本管理器。replica在Kafka中扮演的角色很重要,是保证消息不丢失的一个重要概念。

    replica的个人理解概念如下:producer发送的消息给broker,broker是分为多个partition的,对于同一个partition中的broker,这些机器是有主从的概念的。producer只会向leader写入消息,consumer只会从leader读取消息,(leader负责读写,replica保证消息不丢)。为了保证消息不丢失,follower会定时从leader拉取消息,保持与leader的消息同步。当然,producer可以配置是否需要有follower同步成功,以及需要多少个replica,(即需要多少个ack)才算是消息发送成功。这块看个人的需求。

    下面我们看下Replica Manager的启动过程。

    一、入口

    入口在KafkaServer的start方法中,比较简洁:

    replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,isShuttingDown)
    replicaManager.startup()
    

    我们主要看下ReplicaManager里面都有什么内容。

    二、ReplicaManager实例化

    我们看看实例化的过程:

    /* epoch of the controller that last changed the leader */
    @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
    private val localBrokerId = config.brokerId
    private val allPartitions = new Pool[(String, Int), Partition](valueFactory = Some { case (t, p) =>
      new Partition(t, p, time, this)
    })
    private val replicaStateChangeLock = new Object
    val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, jTime, threadNamePrefix)
    private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
    val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
    private var hwThreadInitialized = false
    this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
    val stateChangeLogger = KafkaController.stateChangeLogger
    private val isrChangeSet: mutable.Set[TopicAndPartition] = new mutable.HashSet[TopicAndPartition]()
    private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
    private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())
    
    val delayedProducePurgatory = DelayedOperationPurgatory[DelayedProduce](
        purgatoryName = "Produce", config.brokerId, config.producerPurgatoryPurgeIntervalRequests)
    val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch](
        purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests)
    
    • 首先是这个controllerEpoch,这个值表示的是leader发生变化时controller的epoch。epoch存储在zk中的/Controller_epoch中。
    • 第二步是从配置broker.id中获取当前机器的brokerId。
    • 实例化ReplicaFetcherManager,是一个follower从leader拉取消息的管理器,这里面有文章。
    • 设置highWatermarkCheckPointThreadStarted为false,为了后续启动相关的线程用。
    • 从文件(replication-offset-checkpoint)中获取所有topic和partition的HW,这个文件中存储了每个topic和partition对应的最新的checkPoint对应的offset值。HW表示的是topic的partition对应的最后一次commit的消息的offset值,也是用于消息完整性的保证。
    • 定义了isrChangerSet,表示了isr改变顺序的集合。至于isr是干啥的,网上的内容比较多,搜索即可。
    • 最后涉及到两个配置,分别是:
      • producer.purgatory.purge.interval.requests:默认值1000,用于在procucer的ack设置是-1或者1时,跟踪消息是否添加成功,使用DelayedProduce实现。成功后清除。
      • fetch.purgatory.purge.interval.requests:默认值1000,fetch 请求清除时的清除间隔

    三、启动ReplicaManager

    我们主要看下ReplicaManager的start方法:

    def startup() {
    // start ISR expiration thread
    scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS)
    scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS)
    }
    

    这块主要启动了两个定时任务,分别是maybeShrinkIsr和maybePropagateIsrChanges。下面我们着重分析下。

    3.1 maybeShrinkIsr

    这个方法的调用时间间隔由配置replica.lag.time.max.ms控制,主要用于检查partition对应的isr列表中是否有心跳过期的isr。

      private def maybeShrinkIsr(): Unit = {
        trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
        allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs))
      }
    

    这块主要是遍历了所有的partition,每个partition都执行maybeShrinkIsr方法,下面我们进入maybeShrinkIsr,分析下主要做了哪些事情。

      def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
        val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
          leaderReplicaIfLocal() match {
            case Some(leaderReplica) =>
              val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
              if(outOfSyncReplicas.size > 0) {
                val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
                assert(newInSyncReplicas.size > 0)
                info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId,
                  inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
                // update ISR in zk and in cache
                updateIsr(newInSyncReplicas)
                // we may need to increment high watermark since ISR could be down to 1
    
                replicaManager.isrShrinkRate.mark()
                maybeIncrementLeaderHW(leaderReplica)
              } else {
                false
              }
    
            case None => false // do nothing if no longer leader
          }
        }
    

    整个步骤如下:

    • leaderReplicaIfLocal:先检查当前的partition的leader是否为当前的broker,如果为是,就不进入下面的方法,否则进入下面的方法。
    • getOutOfSyncReplicas:获取不同步的replica列表,获取的方法是首先从isr中去除掉leader,然后把当前时间-lastCaughtUpTimeMs大于replicaMaxLagTimeMs的replica筛选出来,即为outOfSyncReplicas。这里面的lastCaughtUpTimeMs是指上次同步的时间,不一定是心跳时间。
    • 如果outOfSyncReplicas中存在replica,则继续。两个列表进行差值运算后得到新的isr列表,之后更新isr列表(即zk中的数据)。
    • 最后可能需要更新下HW

    3.2 maybePropagateIsrChanges

    这个方法的调用时间是固定的,不由配置决定,代码中写死,为2500ms。这个方法会把isr的变化内容更新到zk中去,执行这个方法的条件是:

    • ISR变化没有被广播出去
    • 最近5s内没有ISR变化或者上次广播的时间距离当前时间超过了60s,其实这里的广播就是指写入到zk中
      def maybePropagateIsrChanges() {
        val now = System.currentTimeMillis()
        isrChangeSet synchronized {
          if (isrChangeSet.nonEmpty &&
            (lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||
              lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
            ReplicationUtils.propagateIsrChanges(zkUtils, isrChangeSet)
            isrChangeSet.clear()
            lastIsrPropagationMs.set(now)
          }
        }
      }
    
  • 相关阅读:
    DPDK安装方法 17.12.13
    numa.h:No such file or directory 解决方法
    17秋 软件工程 第六次作业 Beta冲刺 Scrum3
    17秋 软件工程 第六次作业 Beta冲刺 总结博客
    17秋 软件工程 第六次作业 Beta冲刺 Scrum2
    Paper Reviews and Presentations
    17秋 软件工程 第六次作业 Beta冲刺 Scrum1
    17秋 软件工程 第六次作业 Beta冲刺
    error: could not create '/System/Library/Frameworks/Python.framework/Versions/2.7/share': Operation not permitted
    17秋 软件工程 个人作业 软件产品案例分析
  • 原文地址:https://www.cnblogs.com/f-zhao/p/7803161.html
Copyright © 2011-2022 走看看