  • kafka源码分析(一)server启动分析


    1 启动入口Kafka.scala


     1 def main(args: Array[String]): Unit = {
     3     try {
     4       //通过args读取properties
     5       val serverProps = getPropsFromArgs(args)
     6       val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
     8       // 增加shutdown方法
     9       Runtime.getRuntime().addShutdownHook(new Thread() {
    10         override def run() = {
    12           kafkaServerStartable.shutdown
    13         }
    14       })
    16       kafkaServerStartable.startup
    17       kafkaServerStartable.awaitShutdown
    18     }
    19     catch {
    20       case e: Throwable =>
    21         fatal(e)
    22         System.exit(1)
    23     }
    24     System.exit(0)
    25   }



    • 创建KafkaServerStartable对象;
    • KafkaServerStartable对象增加shutdown函数;
    • 启动KafkaServerStartable的starup()方法;
    • 启动KafkaServerStartable的awaitShutdown()方法;

    2 KafkaServer的包装类KafkaServerStartable

     1 def startup() {
     2     try {
     3       server.startup()
     4     }
     5     catch {
     6       case e: Throwable =>
     7         fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)
     8         System.exit(1)
     9     }
    10   }

    3 具体启动类KafkaServer


      1 /**
      2     * 启动接口
      3     * 生成Kafka server实例
      4     * 实例化LogManager、SocketServer和KafkaRequestHandlers
      5     */
      6   def startup() {
      7     try {
      9       if (isShuttingDown.get)
     10         throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
     12       if (startupComplete.get)
     13         return
     15       val canStartup = isStartingUp.compareAndSet(false, true)
     16       if (canStartup) {
     17         brokerState.newState(Starting)
     19         /* start scheduler */
     20         kafkaScheduler.startup()
     22         /* setup zookeeper */
     23         zkUtils = initZk()
     25         /* Get or create cluster_id */
     26         _clusterId = getOrGenerateClusterId(zkUtils)
     27         info(s"Cluster ID = $clusterId")
     29         /* generate brokerId */
     30         config.brokerId = getBrokerId
     31         this.logIdent = "[Kafka Server " + config.brokerId + "], "
     33         /* create and configure metrics */
     34         val reporters = config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter],
     35           Map[String, AnyRef](KafkaConfig.BrokerIdProp -> (config.brokerId.toString)).asJava)
     36         reporters.add(new JmxReporter(jmxPrefix))
     37         val metricConfig = KafkaServer.metricConfig(config)
     38         metrics = new Metrics(metricConfig, reporters, time, true)
     40         quotaManagers = QuotaFactory.instantiate(config, metrics, time)
     41         notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
     43         /* start log manager */
     44         logManager = createLogManager(zkUtils.zkClient, brokerState)
     45         logManager.startup()
     47         metadataCache = new MetadataCache(config.brokerId)
     48         credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
     50         socketServer = new SocketServer(config, metrics, time, credentialProvider)
     51         socketServer.startup()
     53         /* start replica manager */
     54         replicaManager = new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager,
     55           isShuttingDown, quotaManagers.follower)
     56         replicaManager.startup()
     58         /* start kafka controller */
     59         kafkaController = new KafkaController(config, zkUtils, brokerState, time, metrics, threadNamePrefix)
     60         kafkaController.startup()
     62         adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
     64         /* start group coordinator */
     65         // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
     66         groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM)
     67         groupCoordinator.startup()
     69         /* Get the authorizer and initialize it if one is specified.*/
     70         authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
     71           val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
     72           authZ.configure(config.originals())
     73           authZ
     74         }
     76         /* start processing requests */
     77         apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator,
     78           kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
     79           clusterId, time)
     81         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
     82           config.numIoThreads)
     84         Mx4jLoader.maybeLoad()
     86         /* start dynamic config manager */
     87         dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
     88           ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
     89           ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
     90           ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
     92         // Create the config manager. start listening to notifications
     93         dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
     94         dynamicConfigManager.startup()
     96         /* tell everyone we are alive */
     97         val listeners = config.advertisedListeners.map { endpoint =>
     98           if (endpoint.port == 0)
     99             endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
    100           else
    101             endpoint
    102         }
    103         kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
    104           config.interBrokerProtocolVersion)
    105         kafkaHealthcheck.startup()
    107         // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
    108         checkpointBrokerId(config.brokerId)
    110         /* register broker metrics */
    111         registerStats()
    113         brokerState.newState(RunningAsBroker)
    114         shutdownLatch = new CountDownLatch(1)
    115         startupComplete.set(true)
    116         isStartingUp.set(false)
    117         AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
    118         info("started")
    119       }
    120     }
    121     catch {
    122       case e: Throwable =>
    123         fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
    124         isStartingUp.set(false)
    125         shutdown()
    126         throw e
    127     }
    128   }

    3.1 KafkaScheduler


     1 /**
     2   * KafkaScheduler是一个基于java.util.concurrent.ScheduledThreadPoolExecutor的scheduler
     3   * 它内部是以前缀kafka-scheduler-xx的线程池处理真正的工作
     4   *
     5   * @param threads          线程池里线程的数量
     6   * @param threadNamePrefix 使用时的线程名称,这个前缀将有一个附加的数字
     7   * @param daemon           如果为true,线程将是守护线程,并且不会阻塞jvm关闭
     8   */
     9 @threadsafe
    10 class KafkaScheduler(val threads: Int,
    11                      val threadNamePrefix: String = "kafka-scheduler-",
    12                      daemon: Boolean = true) extends Scheduler with Logging {
    13   private var executor: ScheduledThreadPoolExecutor = null
    14   private val schedulerThreadId = new AtomicInteger(0)
    16   override def startup() {
    17     debug("Initializing task scheduler.")
    18     this synchronized {
    19       if (isStarted)
    20         throw new IllegalStateException("This scheduler has already been started!")
    21       executor = new ScheduledThreadPoolExecutor(threads)
    22       executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
    23       executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
    24       executor.setThreadFactory(new ThreadFactory() {
    25         def newThread(runnable: Runnable): Thread =
    26           Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
    27       })
    28     }
    29   }

    3.2 zk初始化


    1     // 连接到zk服务器;创建通用节点
    2     val zkUtils = ZkUtils(config.zkConnect,
    3       sessionTimeout = config.zkSessionTimeoutMs,
    4       connectionTimeout = config.zkConnectionTimeoutMs,
    5       secureAclsEnabled)
    6     zkUtils.setupCommonPaths()


     1   // 这些是在kafka代理启动时应该存在的路径
     2   val persistentZkPaths = Seq(ConsumersPath,
     3     BrokerIdsPath,
     4     BrokerTopicsPath,
     5     ConfigChangesPath,
     6     getEntityConfigRootPath(ConfigType.Topic),
     7     getEntityConfigRootPath(ConfigType.Client),
     8     DeleteTopicsPath,
     9     BrokerSequenceIdPath,
    10     IsrChangeNotificationPath)

    3.3 日志管理器LogManager


     1 /**
     2     * 启动后台线程,负责log的创建,检索及清理
     3     */
     4   def startup() {
     5     /* Schedule the cleanup task to delete old logs */
     6     if (scheduler != null) {
     7       info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
     8       scheduler.schedule("kafka-log-retention",
     9         cleanupLogs,
    10         delay = InitialTaskDelayMs,
    11         period = retentionCheckMs,
    12         TimeUnit.MILLISECONDS)
    13       info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
    14       scheduler.schedule("kafka-log-flusher",
    15         flushDirtyLogs,
    16         delay = InitialTaskDelayMs,
    17         period = flushCheckMs,
    18         TimeUnit.MILLISECONDS)
    19       scheduler.schedule("kafka-recovery-point-checkpoint",
    20         checkpointRecoveryPointOffsets,
    21         delay = InitialTaskDelayMs,
    22         period = flushCheckpointMs,
    23         TimeUnit.MILLISECONDS)
    24       scheduler.schedule("kafka-delete-logs",
    25         deleteLogs,
    26         delay = InitialTaskDelayMs,
    27         period = defaultConfig.fileDeleteDelayMs,
    28         TimeUnit.MILLISECONDS)
    29     }
    30     if (cleanerConfig.enableCleaner)
    31       cleaner.startup()
    32   }

    3.4 SocketServer

    1 /**
    2   * SocketServer是socket服务器,
    3   * 线程模型是:1个Acceptor线程处理新连接,Acceptor还有多个处理器线程,每个处理器线程拥有自己的选择器和多个读socket请求Handler线程。
    4   * handler线程处理请求并产生响应写给处理器线程
    5   */
    6 class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {

    3.5 复制管理器ReplicaManager


    1 def startup() {
    2     // 启动ISR过期线程
    3     // 一个follower可以在配置上落后于leader。在它被从ISR中移除之前,复制
    4     scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
    5     scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS)
    6   }

    3.6 kafka控制器KafkaController

    当kafka 服务器的控制器模块启动时激活

    1 def startup() = {
    2     inLock(controllerContext.controllerLock) {
    3       info("Controller starting up")
    4       registerSessionExpirationListener()
    5       isRunning = true
    6       controllerElector.startup
    7       info("Controller startup complete")
    8     }
    9   }


     1 private def registerSessionExpirationListener() = {
     2     zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener())
     3   }
     4     public void subscribeStateChanges(final IZkStateListener listener) {
     5         synchronized (_stateListener) {
     6             _stateListener.add(listener);
     7         }
     8     }
    11  class SessionExpirationListener() extends IZkStateListener with Logging {
    12       this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
    13       @throws(classOf[Exception])
    14       def handleStateChanged(state: KeeperState) {
    15         // do nothing, since zkclient will do reconnect for us.
    16     }


    def startup {
        inLock(controllerContext.controllerLock) {
          controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
    def elect: Boolean = {
        val timestamp = SystemTime.milliseconds.toString
        val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
       leaderId = getControllerID 
         * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, 
         * it's possible that the controller has already been elected when we get here. This check will prevent the following 
         * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
        if(leaderId != -1) {
           debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
           return amILeader
        try {
          val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath,
          info(brokerId + " successfully elected as leader")
          leaderId = brokerId
        } catch {
          case e: ZkNodeExistsException =>
            // If someone else has written the path, then
            leaderId = getControllerID 
            if (leaderId != -1)
              debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
              warn("A leader has been elected but just resigned, this will result in another round of election")
          case e2: Throwable =>
            error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
     def amILeader : Boolean = leaderId == brokerId

    3.7 GroupCoordinator


    1 def startup() {
    2     info("Starting up.")
    3     heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId)
    4     joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", brokerId)
    5     isActive.set(true)
    6     info("Startup complete.")
    7   }


    3.8 KafkaApis消息处理接口

     1 /**
     2    * Top-level method that handles all requests and multiplexes to the right api
     3    */
     4   def handle(request: RequestChannel.Request) {
     5     try{
     6       trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
     7         format(request.requestObj, request.connectionId, request.securityProtocol, request.session.principal))
     8       request.requestId match {
     9         case RequestKeys.ProduceKey => handleProducerRequest(request)
    10         case RequestKeys.FetchKey => handleFetchRequest(request)
    11         case RequestKeys.OffsetsKey => handleOffsetRequest(request)
    12         case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
    13         case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
    14         case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
    15         case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
    16         case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
    17         case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
    18         case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
    19         case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)
    20         case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
    21         case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
    22         case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)
    23         case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request)
    24         case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request)
    25         case RequestKeys.ListGroupsKey => handleListGroupsRequest(request)
    26         case requestId => throw new KafkaException("Unknown api code " + requestId)
    27       }
    28     } catch {
    29       case e: Throwable =>
    30         if ( request.requestObj != null)
    31           request.requestObj.handleError(e, requestChannel, request)
    32         else {
    33           val response = request.body.getErrorResponse(request.header.apiVersion, e)
    34           val respHeader = new ResponseHeader(request.header.correlationId)
    36           /* If request doesn't have a default error response, we just close the connection.
    37              For example, when produce request has acks set to 0 */
    38           if (response == null)
    39             requestChannel.closeConnection(request.processor, request)
    40           else
    41             requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response)))
    42         }
    43         error("error when handling request %s".format(request.requestObj), e)
    44     } finally
    45       request.apiLocalCompleteTimeMs = SystemTime.milliseconds
    46   }


     1 /**
     2    * Handle a produce request
     3    */
     4   def handleProducerRequest(request: RequestChannel.Request) {
     5     val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
     6     val numBytesAppended = produceRequest.sizeInBytes
     8     val (authorizedRequestInfo, unauthorizedRequestInfo) =  produceRequest.data.partition  {
     9       case (topicAndPartition, _) => authorize(request.session, Write, new Resource(Topic, topicAndPartition.topic))
    10     }
    12     // the callback for sending a produce response
    13     def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
    15       val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.TopicAuthorizationCode, -1))
    17       var errorInResponse = false
    19       mergedResponseStatus.foreach { case (topicAndPartition, status) =>
    20         if (status.error != ErrorMapping.NoError) {
    21           errorInResponse = true
    22           debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
    23             produceRequest.correlationId,
    24             produceRequest.clientId,
    25             topicAndPartition,
    26             ErrorMapping.exceptionNameFor(status.error)))
    27         }
    28       }
    30       def produceResponseCallback(delayTimeMs: Int) {
    32         if (produceRequest.requiredAcks == 0) {
    33           // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
    34           // the request, since no response is expected by the producer, the server will close socket server so that
    35           // the producer client will know that some error has happened and will refresh its metadata
    36           if (errorInResponse) {
    37             val exceptionsSummary = mergedResponseStatus.map { case (topicAndPartition, status) =>
    38               topicAndPartition -> ErrorMapping.exceptionNameFor(status.error)
    39             }.mkString(", ")
    40             info(
    41               s"Closing connection due to error during produce request with correlation id ${produceRequest.correlationId} " +
    42                 s"from client id ${produceRequest.clientId} with ack=0
    " +
    43                 s"Topic and partition to exceptions: $exceptionsSummary"
    44             )
    45             requestChannel.closeConnection(request.processor, request)
    46           } else {
    47             requestChannel.noOperation(request.processor, request)
    48           }
    49         } else {
    50           val response = ProducerResponse(produceRequest.correlationId,
    51                                           mergedResponseStatus,
    52                                           produceRequest.versionId,
    53                                           delayTimeMs)
    54           requestChannel.sendResponse(new RequestChannel.Response(request,
    55                                                                   new RequestOrResponseSend(request.connectionId,
    56                                                                                             response)))
    57         }
    58       }
    60       // When this callback is triggered, the remote API call has completed
    61       request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
    63       quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId,
    64                                                                    numBytesAppended,
    65                                                                    produceResponseCallback)
    66     }
    68     if (authorizedRequestInfo.isEmpty)
    69       sendResponseCallback(Map.empty)
    70     else {
    71       val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId
    73       // call the replica manager to append messages to the replicas
    74       replicaManager.appendMessages(
    75         produceRequest.ackTimeoutMs.toLong,
    76         produceRequest.requiredAcks,
    77         internalTopicsAllowed,
    78         authorizedRequestInfo,
    79         sendResponseCallback)
    81       // if the request is put into the purgatory, it will have a held reference
    82       // and hence cannot be garbage collected; hence we clear its data here in
    83       // order to let GC re-claim its memory since it is already appended to log
    84       produceRequest.emptyData()
    85     }
    86   }


    3.9 动态配置管理DynamicConfigManager


     1 /**
     2    * Begin watching for config changes
     3    */
     4   def startup() {
     5     zkUtils.makeSurePersistentPathExists(ZkUtils.EntityConfigChangesPath)
     6     zkUtils.zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener)
     7     processAllConfigChanges()
     8   }
    10   /**
    11    * Process all config changes
    12    */
    13   private def processAllConfigChanges() {
    14     val configChanges = zkUtils.zkClient.getChildren(ZkUtils.EntityConfigChangesPath)
    15     import JavaConversions._
    16     processConfigChanges((configChanges: mutable.Buffer[String]).sorted)
    17   }
    19   /**
    20    * Process the given list of config changes
    21    */
    22   private def processConfigChanges(notifications: Seq[String]) {
    23     if (notifications.size > 0) {
    24       info("Processing config change notification(s)...")
    25       val now = time.milliseconds
    26       for (notification <- notifications) {
    27         val changeId = changeNumber(notification)
    29         if (changeId > lastExecutedChange) {
    30           val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification
    32           val (jsonOpt, stat) = zkUtils.readDataMaybeNull(changeZnode)
    33           processNotification(jsonOpt)
    34         }
    35         lastExecutedChange = changeId
    36       }
    37       purgeObsoleteNotifications(now, notifications)
    38     }
    39   }

    3.10 心跳检测KafkaHealthcheck


     1 def startup() {
     2     zkUtils.zkClient.subscribeStateChanges(sessionExpireListener)
     3     register()
     4   }
     6   /**
     7    * Register this broker as "alive" in zookeeper
     8    */
     9   def register() {
    10     val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
    11     val updatedEndpoints = advertisedEndpoints.mapValues(endpoint =>
    12       if (endpoint.host == null || endpoint.host.trim.isEmpty)
    13         EndPoint(InetAddress.getLocalHost.getCanonicalHostName, endpoint.port, endpoint.protocolType)
    14       else
    15         endpoint
    16     )
    18     // the default host and port are here for compatibility with older client
    19     // only PLAINTEXT is supported as default
    20     // if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connect
    21     val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null))
    22     zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort)
    23   }

    4 小结


    ReplicaManager    副本管理

    KafkaApis    处理所有request的Proxy类,根据requestKey决定调用具体的handler

    KafkaRequestHandlerPool 处理request的线程池,请求处理池

    LogManager    kafka文件存储系统管理,负责处理和存储所有Kafka的topic的partiton数据

    TopicConfigManager  监听此zk节点的⼦子节点/config/changes/,通过LogManager更新topic的配置信息,topic粒度配置管理

    KafkaHealthcheck 监听zk session expire,在zk上创建broker信息,便于其他broker和consumer获取其信息

    KafkaController  kafka集群中央控制器选举,leader选举,副本分配。

    KafkaScheduler  负责副本管理和日志管理调度等等

    ZkClient         负责注册zk相关信息.

    BrokerTopicStats  topic信息统计和监控

    ControllerStats          中央控制器统计和监控

