zoukankan      html  css  js  c++  java
  • 【Kafka源码】Kafka启动过程

    一般来说,我们是通过命令来启动kafka,但是命令的本质还是调用代码中的main方法,所以,我们重点看下启动类Kafka。源码下下来之后,我们也可以通过直接运行Kafka.scala中的main方法(需要指定启动参数,也就是server.properties的位置)来启动Kafka。因为kafka依赖zookeeper,所以我们需要提前启动zookeeper,然后在server.properties中指定zk地址后,启动。

    下面我们首先看一下main()方法:

      def main(args: Array[String]): Unit = {
        try {
          val serverProps = getPropsFromArgs(args)
          val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
    
          // attach shutdown handler to catch control-c
          Runtime.getRuntime().addShutdownHook(new Thread() {
            override def run() = {
              kafkaServerStartable.shutdown
            }
          })
    
          kafkaServerStartable.startup
          kafkaServerStartable.awaitShutdown
        }
        catch {
          case e: Throwable =>
            fatal(e)
            System.exit(1)
        }
        System.exit(0)
      }
    

    我们慢慢来分析下,首先是getPropsFromArgs(args),这一行很明确,就是从配置文件中读取我们配置的内容,然后赋值给serverProps。第二步,KafkaServerStartable.fromProps(serverProps),

    object KafkaServerStartable {
      def fromProps(serverProps: Properties) = {
        KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
        new KafkaServerStartable(KafkaConfig.fromProps(serverProps))
      }
    }
    

    这块主要是启动了一个内部的监控服务(内部状态监控)。

    下面是一个在java中常见的钩子函数,在关闭时会启动一些销毁程序,保证程序安全关闭。之后就是我们启动的重头戏了:kafkaServerStartable.startup。跟进去可以很清楚的看到,里面调用的方法是KafkaServer中的startup方法,下面我们重点看下这个方法(比较长):

    def startup() {
        try {
          info("starting")
    
          if(isShuttingDown.get)
            throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
    
          if(startupComplete.get)
            return
    
          val canStartup = isStartingUp.compareAndSet(false, true)
          if (canStartup) {
            metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)
    
            brokerState.newState(Starting)
    
            /* start scheduler */
            kafkaScheduler.startup()
    
            /* setup zookeeper */
            zkUtils = initZk()
    
            /* start log manager */
            logManager = createLogManager(zkUtils.zkClient, brokerState)
            logManager.startup()
    
            /* generate brokerId */
            config.brokerId =  getBrokerId
            this.logIdent = "[Kafka Server " + config.brokerId + "], "
    
            socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
            socketServer.startup()
    
            /* start replica manager */
            replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
              isShuttingDown)
            replicaManager.startup()
    
            /* start kafka controller */
            kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
            kafkaController.startup()
    
            /* start group coordinator */
            groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, kafkaMetricsTime)
            groupCoordinator.startup()
    
            /* Get the authorizer and initialize it if one is specified.*/
            authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
              val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
              authZ.configure(config.originals())
              authZ
            }
    
            /* start processing requests */
            apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator,
              kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
            requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
            brokerState.newState(RunningAsBroker)
    
            Mx4jLoader.maybeLoad()
    
            /* start dynamic config manager */
            dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config),
                                                               ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers))
    
            // Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides
            // TODO: Move this logic to DynamicConfigManager
            AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {
              case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties)
            }
    
            // Create the config manager. start listening to notifications
            dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
            dynamicConfigManager.startup()
    
            /* tell everyone we are alive */
            val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
              if (endpoint.port == 0)
                (protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))
              else
                (protocol, endpoint)
            }
            kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
              config.interBrokerProtocolVersion)
            kafkaHealthcheck.startup()
    
            // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
            checkpointBrokerId(config.brokerId)
    
            /* register broker metrics */
            registerStats()
    
            shutdownLatch = new CountDownLatch(1)
            startupComplete.set(true)
            isStartingUp.set(false)
            AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
            info("started")
          }
        }
        catch {
          case e: Throwable =>
            fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
            isStartingUp.set(false)
            shutdown()
            throw e
        }
      }
    

    首先判断是否目前正在关闭中或者已经启动了,这两种情况直接抛出异常。然后是一个CAS的操作isStartingUp,防止线程并发操作启动,判断是否可以启动。如果可以启动,就开始我们的启动过程。

    • 构造Metrics类
    • 定义broker状态为启动中starting
    • 启动定时器kafkaScheduler.startup()
    • 构造zkUtils:利用参数中的zk信息,启动一个zk客户端
    • 启动文件管理器:读取zk中的配置信息,包含__consumer_offsets和__system.topic__。重点是启动一些定时任务,来删除符合条件的记录(cleanupLogs),清理脏记录(flushDirtyLogs),把所有记录写到一个文本文件中,防止在启动时重启所有的记录文件(checkpointRecoveryPointOffsets)。
      /**
       *  Start the background threads to flush logs and do log cleanup
       */
      def startup() {
        /* Schedule the cleanup task to delete old logs */
        if(scheduler != null) {
          info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
          scheduler.schedule("kafka-log-retention", 
                             cleanupLogs, 
                             delay = InitialTaskDelayMs, 
                             period = retentionCheckMs, 
                             TimeUnit.MILLISECONDS)
          info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
          scheduler.schedule("kafka-log-flusher", 
                             flushDirtyLogs, 
                             delay = InitialTaskDelayMs, 
                             period = flushCheckMs, 
                             TimeUnit.MILLISECONDS)
          scheduler.schedule("kafka-recovery-point-checkpoint",
                             checkpointRecoveryPointOffsets,
                             delay = InitialTaskDelayMs,
                             period = flushCheckpointMs,
                             TimeUnit.MILLISECONDS)
        }
        if(cleanerConfig.enableCleaner)
          cleaner.startup()
      }
    
    • 下一步,获取brokerId
    • 启动一个NIO socket服务
    • 启动复制管理器:启动ISR超时处理线程
    • 启动kafka控制器:注册session过期监听器,同时启动控制器leader选举
    • 启动协调器
    • 权限认证
    • 开启线程,开始处理请求
    • 开启配置监听,主要是监听zk节点数据变化,然后广播到所有机器
    • 开启健康检查:目前只是把broker节点注册到zk上,注册成功就是活的,否则就是dead
    • 注册启动数据信息
    • 启动成功
    • 等待关闭countDownLatch,如果shutdownLatch变为0,则关闭Kafka
  • 相关阅读:
    iframe透明
    c#创建可以为空类型
    div仿框架布局
    IBatis.Net学习笔记(六):Castle.DynamicProxy的使用
    很好玩的谷歌纵横
    TFS签入签出规范
    ibatis学习笔记
    iBATIS.net调用存储过程
    最新28个很棒的 jQuery 教程
    IBatis.Net 中的数据类型转换
  • 原文地址:https://www.cnblogs.com/f-zhao/p/7717874.html
Copyright © 2011-2022 走看看