zoukankan      html  css  js  c++  java
  • Apache Kafka源码分析

    在broker的配置中,auto.leader.rebalance.enable (false)

    那么这个leader是如何进行rebalance的?

    首先在controller启动的时候会打开一个scheduler,

    if (config.autoLeaderRebalanceEnable) { //如果打开outoLeaderRebalance,需要把partiton leader由于dead而发生迁徙的,重新迁徙回去
            info("starting the partition rebalance scheduler")
            autoRebalanceScheduler.startup()
            autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
              5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)
          }

    定期去做,

    checkAndTriggerPartitionRebalance

    这个函数逻辑,就是找出所有发生过迁移的replica,即

    topicsNotInPreferredReplica

    并且判断如果满足imbalance比率,即自动触发leader rebalance,将leader迁回perfer replica

    关键要理解什么是preferred replicas?

    preferredReplicasForTopicsByBrokers =
              controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
                case(topicAndPartition, assignedReplicas) => assignedReplicas.head
              }
     partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] 
    TopicAndPartition可以通过topic name和partition id来唯一标识一个partition,Seq[int],表示brokerids,表明这个partition的replicas在哪些brokers上面

    从partition的ReplicaAssignment里面过滤掉delete的topic,然后按照assignedReplicas.head进行groupby,就是按照Seq中的第一个brokerid
    意思就是说,默认每个partition的preferred replica就是第一个被assign的replica

    groupby的结果就是,每个broker,和应该以该broker作为leader的所有partition,即

    case(leaderBroker, topicAndPartitionsForBroker)

    那么找出里面当前leader不是preferred的,即发生过迁移的,
    很简单,直接和leaderAndIsr里面的leader进行比较,如果不相等就说明发生过迁徙

    topicsNotInPreferredReplica =
                  topicAndPartitionsForBroker.filter {
                    case(topicPartition, replicas) => {
                      controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
                      controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
                    }
                  }

    并且只有当某个broker上的imbalanceRatio大于10%的时候,才会触发rebalance

    imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker

    对每个partition的迁移过程,
    首先preferred的broker要是活着的,并且当前是没有partition正在进行reassign或replica election的,说明这个过程是不能并行的,同时做reassign很容易冲突

    // do this check only if the broker is live and there are no partitions being reassigned currently
                      // and preferred replica election is not in progress
                      if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
                          controllerContext.partitionsBeingReassigned.size == 0 &&
                          controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
                          !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
                          controllerContext.allTopics.contains(topicPartition.topic)) {
                        onPreferredReplicaElection(Set(topicPartition), true)

    onPreferredReplicaElection

    还是通过partitionStateMachine,来改变partition的状态

    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)

    partitionStateMachine会另外分析,这里只需要知道,当前partition的状态是,OnlinePartition –> OnlinePartition
    并且是以preferredReplicaPartitionLeaderSelector,作为leaderSelector的策略

     

    PreferredReplicaPartitionLeaderSelector

    策略很简单,就是把leader换成preferred replica

    def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
        val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
        val preferredReplica = assignedReplicas.head  //取AR第一个replica作为preferred
        // check if preferred replica is the current leader
        val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
        if (currentLeader == preferredReplica) { //如果当前leader就是preferred就不需要做了
          throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s" .format(preferredReplica, topicAndPartition))
        } else {
          info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) + " Trigerring preferred replica leader election")
          // check if preferred replica is not the current leader and is alive and in the isr
          if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) { //判断当前preferred replica所在broker是否活,是否在isr中
            (new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr, currentLeaderAndIsr.zkVersion + 1), assignedReplicas) //产生新的leaderAndIsr
          } else {
            throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) +
              "%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
          }
        }
      }
    }
  • 相关阅读:
    django继承User表导致的问题
    Sublime Text 3 鼠标右键快捷选项 简单且高效
    php所有版本
    windows 安装redis
    php玩弄redis基本操作
    PHP 实现文件下载
    PHP 获取客户端的真实IP
    根据标签内个数,显示隐藏《加载更多》
    ubuntu 安装nginx+php+mysql+phpadmin环境
    面试总结
  • 原文地址:https://www.cnblogs.com/fxjwind/p/4913819.html
Copyright © 2011-2022 走看看