zoukankan      html  css  js  c++  java
  • 【Kafka源码】KafkaController启动过程


    之前聊过了很多Kafka启动过程中的一些加载内容,也知道了broker可以分为很多的partition,每个partition内部也可以分为leader和follower,主从之间有数据的复制。那么这么多partition是谁在管理?broker内部有没有主从之分?这就是本文的主角,KafkaController,本文将细细道来。

    一、入口

    KafkaController的启动入口同样很简洁,在KafkaServer的start方法中。

    /* start kafka controller */
    kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
    kafkaController.startup()
    

    首先实例化一个KafkaController,之后启动了这个controller。

    二、实例化Controller

    实例化的源码,见注释:

    this.logIdent = "[Controller " + config.brokerId + "]: "
    private var isRunning = true
    private val stateChangeLogger = KafkaController.stateChangeLogger
    //实例化上下文
    val controllerContext = new ControllerContext(zkUtils, config.zkSessionTimeoutMs)
    //实例化partition状态机
    val partitionStateMachine = new PartitionStateMachine(this)
    //实例化replica状态机
    val replicaStateMachine = new ReplicaStateMachine(this)
    //实例化broker的leader选举器
    private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
    	onControllerResignation, config.brokerId)
    // have a separate scheduler for the controller to be able to start and stop independently of the
    // kafka server
    //实例化负载均衡定时器
    private val autoRebalanceScheduler = new KafkaScheduler(1)
    //topic删除管理器
    var deleteTopicManager: TopicDeletionManager = null
    //离线分区leader选择器
    val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
    //重新分配分区leader
    private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
    //重新分配leader时优先选择的replica
    private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
    //controller关闭后的leader选举
    private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
    
    private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
    
    //重分配监听器
    private val partitionReassignedListener = new PartitionsReassignedListener(this)
    //优选replica选举监听器
    private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
    //isr变化通知监听器
    private val isrChangeNotificationListener = new IsrChangeNotificationListener(this)
    

    三、Controller启动

    直接上代码:

    def startup() = {
    	inLock(controllerContext.controllerLock) {
    		info("Controller starting up")
    		registerSessionExpirationListener()
    		isRunning = true
    		controllerElector.startup
    		info("Controller startup complete")
    	}
    }
    

    这个start方法并不意味着当前的broker就是controller,只是把它注册到zk上面,后面zk会进行选举,选举出controller后,在controller机器上面会执行一系列的操作,后面我们能看到。

    3.1 registerSessionExpirationListener

    首先,我们的broker会注册一个session过期的监听器,我们看一下这个监听器。

    private def registerSessionExpirationListener() = {
    	zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener())
    }
    	
    class SessionExpirationListener() extends IZkStateListener with Logging {
    	this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
    
    	@throws(classOf[Exception])
    	def handleStateChanged(state: KeeperState) {
    		// do nothing, since zkclient will do reconnect for us.
    	}
    	/**
    	* Called after the zookeeper session has expired and a new session has been created. You would have to re-create
    	* any ephemeral nodes here.
    	*
    	* @throws Exception
    	* On any error.
    	*/
    	@throws(classOf[Exception])
    	def handleNewSession() {
    		info("ZK expired; shut down all controller components and try to re-elect")
    		inLock(controllerContext.controllerLock) {
    			onControllerResignation()
    			controllerElector.elect
    		}
    	}
    
    	override def handleSessionEstablishmentError(error: Throwable): Unit = {
    		//no-op handleSessionEstablishmentError in KafkaHealthCheck should handle this error in its handleSessionEstablishmentError
    	}
    }
    

    可以看到,当broker到zk的session失效之后,broker并不会主动发起重连操作,而是等待zk的重连,当新的session被创建后,也就是当前broker加入到broker列表中之后,会进行两个操作:

    • onControllerResignation:也就是当前controller失效
    • controllerElector.elect:重新进行controller选举

    下面我们分别看看做了啥。

    3.1.1 onControllerResignation

    从代码看会比较直观,主要就是清理一些controller的数据。

    /**
    * This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is
    * required to clean up internal controller data structures
    */
    def onControllerResignation() {
    	debug("Controller resigning, broker id %d".format(config.brokerId))
    	// de-register listeners 取消订阅监听器
    	deregisterIsrChangeNotificationListener()
    	deregisterReassignedPartitionsListener()
    	deregisterPreferredReplicaElectionListener()
    
    	// shutdown delete topic manager 关闭topic删除管理器
    	if (deleteTopicManager != null)
    		deleteTopicManager.shutdown()
    
    	// shutdown leader rebalance scheduler 关闭负载均衡定时器
    	if (config.autoLeaderRebalanceEnable)
    		autoRebalanceScheduler.shutdown()
    
    	inLock(controllerContext.controllerLock) {
    		// de-register partition ISR listener for on-going partition reassignment task 取消订阅ISR监听器
    		deregisterReassignedPartitionsIsrChangeListeners()
    		// shutdown partition state machine 关闭分区状态机
    		partitionStateMachine.shutdown()
    		// shutdown replica state machine 关闭replica状态机
    		replicaStateMachine.shutdown()
    		// shutdown controller channel manager 关闭控制器管道管理器
    		if (controllerContext.controllerChannelManager != null) {
    			controllerContext.controllerChannelManager.shutdown()
    			controllerContext.controllerChannelManager = null
    		}
    		// reset controller context
    		controllerContext.epoch = 0
    		controllerContext.epochZkVersion = 0
    		brokerState.newState(RunningAsBroker)//把当前broker状态从controller改为broker
    
    		info("Broker %d resigned as the controller".format(config.brokerId))
    	}
    }
    

    3.1.2 controllerElector.elect

    这块是进行controller的重新选举。

    def elect: Boolean = {
      val timestamp = SystemTime.milliseconds.toString
      val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
       
     leaderId = getControllerID 
      /* 
       * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, 
       * it's possible that the controller has already been elected when we get here. This check will prevent the following 
       * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
      */
    if(leaderId != -1) {
       debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
       return amILeader
    }
    
    try {
      val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath,
                                                      electString,
                            controllerContext.zkUtils.zkConnection.getZookeeper,
                                              JaasUtils.isZkSecurityEnabled())
        zkCheckedEphemeral.create()
        info(brokerId + " successfully elected as leader")
        leaderId = brokerId
        onBecomingLeader()
    } catch {
        case e: ZkNodeExistsException =>
          // If someone else has written the path, then
          leaderId = getControllerID 
    
        if (leaderId != -1)
          debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
        else
          warn("A leader has been elected but just resigned, this will result in another round of election")
    
        case e2: Throwable =>
          error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
        resign()
    }
    amILeader
    }
    

    这块主要进行的是controller的选举,我们着重看下当前broker被选为controller之后的动作,也就是onBecomingLeader。这块就需要我们返回到实例化中去看下,这个动作是:onControllerFailover。

    def onControllerFailover() {
    	if (isRunning) {
    		info("Broker %d starting become controller state transition".format(config.brokerId))
    		//read controller epoch from zk
    		readControllerEpochFromZookeeper()
    		// increment the controller epoch
    		incrementControllerEpoch(zkUtils.zkClient)
    		// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
    		registerReassignedPartitionsListener()
    		registerIsrChangeNotificationListener()
    		registerPreferredReplicaElectionListener()
    		partitionStateMachine.registerListeners()
    		replicaStateMachine.registerListeners()
    		initializeControllerContext()
    		replicaStateMachine.startup()
    		partitionStateMachine.startup()
    		// register the partition change listeners for all existing topics on failover
    		controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
    		info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
    		brokerState.newState(RunningAsController)
    		maybeTriggerPartitionReassignment()
    		maybeTriggerPreferredReplicaElection()
    		/* send partition leadership info to all live brokers */
    		sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
    		if (config.autoLeaderRebalanceEnable) {
    			info("starting the partition rebalance scheduler")
    			autoRebalanceScheduler.startup()
    			autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
    				5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
    		}
    		deleteTopicManager.start()
    	}
    	else
    		info("Controller has been shut down, aborting startup/failover")
    }
    

    这里面执行的动作很多,我们一一分析。

    • 首先从zk中读取controller的epoch
    • 然后将epoch+1后更新到zk中
    • 注册一系列监听器
    • 初始化controller上下文
    • 启动两个状态机
    • 订阅所有topic的分区变化监听器
    • 定时检查触发分区选举
    • 启动topic删除管理器

    这里面的东西比较多,我们后面文章再分析。

    3.2 controllerElector.startup

    def startup {
      inLock(controllerContext.controllerLock) {
        controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
        elect
      }
    }
    

    这里的electionPath是/controller,下面我们看下这个leaderChangeListener。

    3.2.1 leaderChangeListener

    class LeaderChangeListener extends IZkDataListener with Logging {
       /**
        * Called when the leader information stored in zookeeper has changed. Record the new leader in memory
        * @throws Exception On any error.
        */
    @throws(classOf[Exception])
    def handleDataChange(dataPath: String, data: Object) {
      inLock(controllerContext.controllerLock) {
        val amILeaderBeforeDataChange = amILeader
        leaderId = KafkaController.parseControllerId(data.toString)
        info("New leader is %d".format(leaderId))
        // The old leader needs to resign leadership if it is no longer the leader
        if (amILeaderBeforeDataChange && !amILeader)
          onResigningAsLeader()
      }
    }
    
    /**
     * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
     * @throws Exception
     *             On any error.
     */
    @throws(classOf[Exception])
    def handleDataDeleted(dataPath: String) {
      inLock(controllerContext.controllerLock) {
        debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
          .format(brokerId, dataPath))
        if(amILeader)
          onResigningAsLeader()
        elect
      }
    }
    }
    

    监听对应的zk节点,如果节点发生了变化,调用handleDataChange方法,主要内容是获取当前的leaderId。如果当前broker之前是leader,而新的leader不是自己,那么就会调用onResigningAsLeader方法,清除之前的leader信息。

    如果节点被删除了,就会调用handleDataDeleted方法。如果当前broker是leader,会首先调用onResigningAsLeader方法,然后发起新的leader选举。

    3.2.2 elect

    这边就是我们的controller即leader选举方法。与3.1.2的内容一致。

  • 相关阅读:
    MVC ORM 架构
    Kubernetes 第八章 Pod 控制器
    Kubernetes 第七章 Configure Liveness and Readiness Probes
    Kubernetes 第六章 pod 资源对象
    Kubernetes 第五章 YAML
    Kubernetes 核心组件
    Kubernetes 架构原理
    Kubernetes 第四章 kubectl
    Kubernetes 第三章 kubeadm
    yum 配置及yum 源配置
  • 原文地址:https://www.cnblogs.com/f-zhao/p/7805197.html
Copyright © 2011-2022 走看看