zoukankan      html  css  js  c++  java
  • Kafka控制器选举流程剖析

    1.概述

      平时在使用Kafka的时候,可能关注的更多的是Kafka系统层面的。今天来给大家剖析一下Kafka的控制器,了解一下Kafka控制器的选举流程。

    2.内容

      Kafka控制器,其实就是一个Kafka系统的Broker。它除了具有一般Broker的功能之外,还具有选举主题分区Leader节点的功能。在启动Kafka系统时,其中一个Broker会被选举为控制器,负责管理主题分区和副本状态,还会执行分区重新分配的管理任务。

      如果在Kafka系统运行过程中,当前的控制器出现故障导致不可用,那么Kafka系统会从其他正常运行的Broker中重新选举出新的控制器。

    2.1 控制器启动顺序

      在Kafka集群中,每个Broker在启动时会实例化一个KafkaController类。该类会执行一系列业务逻辑,选举出主题分区的Leader节点,步骤如下:

    • 第一个启动的代理节点,会在Zookeeper系统里面创建一个临时节点/controller,并写入该节点的注册信息,使该节点成为控制器;
    • 其他的代理节点陆续启动时,也会尝试在Zookeeper系统中创建/controller节点,但是由于/controller节点已经存在,所以会抛出“创建/controller节点失败异常”的信息。创建失败的代理节点会根据返回的结果,判断出在Kafka集群中已经有一个控制器被成功创建了,所以放弃创建/controller节点,这样就确保了Kafka集群控制器的唯一性;
    • 其他的代理节点,会在控制器上注册相应的监听器,各个监听器负责监听各自代理节点的状态变化。当监听到节点状态发生变化时,会触发相应的监听函数进行处理。

    2.2 如何查看控制器优先级 ?

      控制器创建的优先级是按照Kafka系统代理节点成功启动的顺序来创建的。用户可以通过改变Kafka系统代理节点的启动顺序,来查看控制器的创建优先级。之后,可以在Zookeeper系统中查看/controller临时节点的内容,例如:

    # 进入Zookeeper集群
    [hadoop@dn1 bin]$  zkCli.sh -server dn1:2181
    
    # 执行查看命令
    [zk: dn1:2181(CONNECTED) 1] get /controller

      成功执行命令后,可以看到代理节点0(即dn1节点)上成功创建了控制器,如下图所示:

    当前启动顺序为:dn1、dn2、dn3,修改启动顺序为:dn3、dn1、dn2。再次查看Zookeeper系统中执行“get /controller”命令,输出结果如下图所示:

    2.3 切换控制器所属的代理节点

      当控制器被关闭或者与Zookeeper系统断开连接时,Zookeeper系统上的临时节点就会被清除。Kafka集群中的监听器会接收到变更通知,各个代理节点会尝试到Zookeeper系统中创建一个控制器的临时节点。第一个成功在Zookeeper系统中创建的代理节点,将会成为新的控制器。每个新选举出来的控制器,会在Zookeeper系统中获取一个递增的controller_epoch值。

    3.主题分区Leader节点的选举过程

      选举控制器的核心思路是:各个代理节点公平竞争抢占Zookeeper系统中创建/controller临时节点,最先创建成功的代理节点会成为控制器,并拥有选举主题分区Leader节点的功能。选举流程如下图所示:

      当Kafka系统实例化KafkaController类时,主题分区Leader节点的选举流程便会开始。其中涉及的核心类包含KafkaController、ZookeeperLeaderElector、LeaderChangeListener、SessionExpirationListener。

    • KafkaController:在实例化ZookeeperLeaderElector类时,分别设置了两个关键的回调函数,即onControllerFailover和onControllerResignation;
    • ZookeeperLeaderElector:实现主题分区的Leader节点选举功能,但是它并不会处理“代理节点与Zookeeper系统之间出现的会话超时”这种情况,它主要负责创建元数据存储路径、实例化变更监听器等,并通过订阅数据变更监听器来实时监听数据的变化,进而开始执行选举Leader的逻辑;
    • LeaderChangeListener:如果节点数据发送变化,则Kafka系统中的其他代理节点可能已经成为Leader,接着Kafka控制器会调用onResigningAsLeader函数。当Kafka代理节点宕机或者被人为误删除时,则处于该节点上的Leader会被重新选举,通过调用onResigningAsLeader函数重新选择其他正常运行的代理节点成为新的Leader;
    • SessionExpirationListener:当Kafka系统的代理节点和Zookeeper系统建立连接后,SessionExpirationListener中的handleNewSession函数会被调用,对于Zookeeper系统中会话过期的连接,会先进行一次判断。

    4.注册分区和副本状态机

      Kafka系统的控制器主要负责管理主题、分区和副本。 Kafka系统在操作主题、分区和副本时,控制器会在Zookeeper系统的/brokers/topics节点,以及其子节点路径上注册一系列的监听器。 使用Kafka应用接口或者是Kafka系统脚本创建一个主题时,服务端会将创建后的结果返回给客户端。当客户端收到创建成功的提示时,其实服务端并没有实际创建主题,而只是在Zookeeper系统的/brokers/topics节点中创建了该主题对应的子节点名称。

      代理节点调用onBecomingLeader()函数实际上调用的是onControllerFailover()函数,所以在控制器调用onControllerFailover()函数时,会在初始化阶段分别创建分区状态机和副本状态机。代码如下所示:

    def onControllerFailover() {
        if(isRunning) {
    info("Broker %d starting become controller state
     transition".format(config.brokerId))
          readControllerEpochFromZookeeper()
          incrementControllerEpoch(zkUtils.zkClient)
    
          // 在/brokers/topics节点注册监听器
          registerReassignedPartitionsListener()
          registerIsrChangeNotificationListener()
          registerPreferredReplicaElectionListener()
          partitionStateMachine.registerListeners()      // 注册分区状态机
          replicaStateMachine.registerListeners()        // 注册副本状态机
    
          initializeControllerContext()
    
          // 在控制器初始化之后,在状态机启动之前,需要发送更新元数据请求
          sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
          
          replicaStateMachine.startup()                  // 启动副本状态机
          partitionStateMachine.startup()                // 启动分区状态机
    
          // 在自动故障转移中为所有主题注册分区更改监听器
          controllerContext.allTopics.foreach(topic => partitionStateMachine.
                  registerPartitionChangeListener(topic))
          info("Broker %d is ready to serve as the new controller with epoch %d".
                  format(config.brokerId, epoch))
          maybeTriggerPartitionReassignment()
          maybeTriggerPreferredReplicaElection()
          if (config.autoLeaderRebalanceEnable) {
            info("starting the partition rebalance scheduler")
            autoRebalanceScheduler.startup()
            autoRebalanceScheduler.schedule("partition-rebalance-thread", 
                checkAndTriggerPartitionRebalance,
                  5, 
                  config.leaderImbalanceCheckIntervalSeconds.toLong, 
                  TimeUnit.SECONDS)
          }
          deleteTopicManager.start()
        }
        else
          info("Controller has been shut down, aborting startup/failover")
    }

      主题的分区状态机通过registerListeners()函数,在Zookeeper系统中的/brokers/topics节点上注册了TopicChangeListener和DeleteTopicListener两个监听器。创建一个主题时,主题信息、主题分区和副本会被写到Zookeeper系统的/brokers/topics节点中,这就会触发分区和副本状态机注册监听器。

    5.总结

      Kafka系统整体来说,调试还算方便。下载Kafka源代码,导入到IDE中,就可以启动整个Kafka系统了,可以通过DEBUG的方式来亲自了解控制器的执行流程。

    6.结束语

      这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

      另外,博主出书了《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。 

  • 相关阅读:
    向Oracle 数据表中插入一条带有日期类型的数据
    JDBC 连接Oracle 数据库,JDBC 连接Mysql 数据库
    球球大作战四亿人都在玩?玩家回归没有优越感,新玩家游戏被虐,游戏体验感极差!
    struts2中的错误--java.lang.NoClassDefFoundError: org/apache/commons/lang3/StringUtils
    如何在idea中设置 jsp 内容修改以后,立即生效而不用重新启动服务?
    idea中 在接口中如何直接跳转到该接口的是实现类中?
    使用IDEA 创建Servlet 的时候,找不到javax.servlet
    如何高效的遍历HashMap 以及对key 进行排序
    springboot 自动装配
    git 多账户添加ssh秘钥
  • 原文地址:https://www.cnblogs.com/smartloli/p/9826923.html
Copyright © 2011-2022 走看看