zoukankan      html  css  js  c++  java
  • kafka源码分析(一)server启动分析

     

    1 启动入口Kafka.scala

    Kafka的启动入口是Kafka.scala的main()函数:

     1 def main(args: Array[String]): Unit = {
     2 
     3     try {
     4       //通过args读取properties
     5       val serverProps = getPropsFromArgs(args)
     6       val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
     7 
     8       // 增加shutdown方法
     9       Runtime.getRuntime().addShutdownHook(new Thread() {
    10         override def run() = {
    11 
    12           kafkaServerStartable.shutdown
    13         }
    14       })
    15 
    16       kafkaServerStartable.startup
    17       kafkaServerStartable.awaitShutdown
    18     }
    19     catch {
    20       case e: Throwable =>
    21         fatal(e)
    22         System.exit(1)
    23     }
    24     System.exit(0)
    25   }

     上面代码主要包含:

    从配置文件读取kafka服务器启动参数的getPropsFromArgs()方法;

    • 创建KafkaServerStartable对象;
    • KafkaServerStartable对象增加shutdown函数;
    • 启动KafkaServerStartable的starup()方法;
    • 启动KafkaServerStartable的awaitShutdown()方法;

    2 KafkaServer的包装类KafkaServerStartable

     1 def startup() {
     2     try {
     3       server.startup()
     4     }
     5     catch {
     6       case e: Throwable =>
     7         fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)
     8         System.exit(1)
     9     }
    10   }

    3 具体启动类KafkaServer

    KafkaServer启动的代码层次比较清晰,加上注释,基本没有问题:

      1 /**
      2     * 启动接口
      3     * 生成Kafka server实例
      4     * 实例化LogManager、SocketServer和KafkaRequestHandlers
      5     */
      6   def startup() {
      7     try {
      8 
      9       if (isShuttingDown.get)
     10         throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
     11 
     12       if (startupComplete.get)
     13         return
     14 
     15       val canStartup = isStartingUp.compareAndSet(false, true)
     16       if (canStartup) {
     17         brokerState.newState(Starting)
     18 
     19         /* start scheduler */
     20         kafkaScheduler.startup()
     21 
     22         /* setup zookeeper */
     23         zkUtils = initZk()
     24 
     25         /* Get or create cluster_id */
     26         _clusterId = getOrGenerateClusterId(zkUtils)
     27         info(s"Cluster ID = $clusterId")
     28 
     29         /* generate brokerId */
     30         config.brokerId = getBrokerId
     31         this.logIdent = "[Kafka Server " + config.brokerId + "], "
     32 
     33         /* create and configure metrics */
     34         val reporters = config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter],
     35           Map[String, AnyRef](KafkaConfig.BrokerIdProp -> (config.brokerId.toString)).asJava)
     36         reporters.add(new JmxReporter(jmxPrefix))
     37         val metricConfig = KafkaServer.metricConfig(config)
     38         metrics = new Metrics(metricConfig, reporters, time, true)
     39 
     40         quotaManagers = QuotaFactory.instantiate(config, metrics, time)
     41         notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
     42 
     43         /* start log manager */
     44         logManager = createLogManager(zkUtils.zkClient, brokerState)
     45         logManager.startup()
     46 
     47         metadataCache = new MetadataCache(config.brokerId)
     48         credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
     49 
     50         socketServer = new SocketServer(config, metrics, time, credentialProvider)
     51         socketServer.startup()
     52 
     53         /* start replica manager */
     54         replicaManager = new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager,
     55           isShuttingDown, quotaManagers.follower)
     56         replicaManager.startup()
     57 
     58         /* start kafka controller */
     59         kafkaController = new KafkaController(config, zkUtils, brokerState, time, metrics, threadNamePrefix)
     60         kafkaController.startup()
     61 
     62         adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
     63 
     64         /* start group coordinator */
     65         // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
     66         groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM)
     67         groupCoordinator.startup()
     68 
     69         /* Get the authorizer and initialize it if one is specified.*/
     70         authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
     71           val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
     72           authZ.configure(config.originals())
     73           authZ
     74         }
     75 
     76         /* start processing requests */
     77         apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator,
     78           kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
     79           clusterId, time)
     80 
     81         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
     82           config.numIoThreads)
     83 
     84         Mx4jLoader.maybeLoad()
     85 
     86         /* start dynamic config manager */
     87         dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
     88           ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
     89           ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
     90           ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
     91 
     92         // Create the config manager. start listening to notifications
     93         dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
     94         dynamicConfigManager.startup()
     95 
     96         /* tell everyone we are alive */
     97         val listeners = config.advertisedListeners.map { endpoint =>
     98           if (endpoint.port == 0)
     99             endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
    100           else
    101             endpoint
    102         }
    103         kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
    104           config.interBrokerProtocolVersion)
    105         kafkaHealthcheck.startup()
    106 
    107         // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
    108         checkpointBrokerId(config.brokerId)
    109 
    110         /* register broker metrics */
    111         registerStats()
    112 
    113         brokerState.newState(RunningAsBroker)
    114         shutdownLatch = new CountDownLatch(1)
    115         startupComplete.set(true)
    116         isStartingUp.set(false)
    117         AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
    118         info("started")
    119       }
    120     }
    121     catch {
    122       case e: Throwable =>
    123         fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
    124         isStartingUp.set(false)
    125         shutdown()
    126         throw e
    127     }
    128   }

    3.1 KafkaScheduler

    KafkaScheduler是一个基于java.util.concurrent.ScheduledThreadPoolExecutor的调度器,它内部是以前缀kafka-scheduler-xx(xx是线程序列号)的线程池处理真正的工作。

     1 /**
     2   * KafkaScheduler是一个基于java.util.concurrent.ScheduledThreadPoolExecutor的scheduler
     3   * 它内部是以前缀kafka-scheduler-xx的线程池处理真正的工作
     4   *
     5   * @param threads          线程池里线程的数量
     6   * @param threadNamePrefix 使用时的线程名称,这个前缀将有一个附加的数字
     7   * @param daemon           如果为true,线程将是守护线程,并且不会阻塞jvm关闭
     8   */
     9 @threadsafe
    10 class KafkaScheduler(val threads: Int,
    11                      val threadNamePrefix: String = "kafka-scheduler-",
    12                      daemon: Boolean = true) extends Scheduler with Logging {
    13   private var executor: ScheduledThreadPoolExecutor = null
    14   private val schedulerThreadId = new AtomicInteger(0)
    15 
    16   override def startup() {
    17     debug("Initializing task scheduler.")
    18     this synchronized {
    19       if (isStarted)
    20         throw new IllegalStateException("This scheduler has already been started!")
    21       executor = new ScheduledThreadPoolExecutor(threads)
    22       executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
    23       executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
    24       executor.setThreadFactory(new ThreadFactory() {
    25         def newThread(runnable: Runnable): Thread =
    26           Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
    27       })
    28     }
    29   }

    3.2 zk初始化

    zookeeper初始化主要完成两件事情:

    1     // 连接到zk服务器;创建通用节点
    2     val zkUtils = ZkUtils(config.zkConnect,
    3       sessionTimeout = config.zkSessionTimeoutMs,
    4       connectionTimeout = config.zkConnectionTimeoutMs,
    5       secureAclsEnabled)
    6     zkUtils.setupCommonPaths()

    通用节点包括:

     1   // 这些是在kafka代理启动时应该存在的路径
     2   val persistentZkPaths = Seq(ConsumersPath,
     3     BrokerIdsPath,
     4     BrokerTopicsPath,
     5     ConfigChangesPath,
     6     getEntityConfigRootPath(ConfigType.Topic),
     7     getEntityConfigRootPath(ConfigType.Client),
     8     DeleteTopicsPath,
     9     BrokerSequenceIdPath,
    10     IsrChangeNotificationPath)

    3.3 日志管理器LogManager

    LogManager是kafka的子系统,负责log的创建,检索及清理。所有的读写操作由单个的日志实例来代理。

     1 /**
     2     * 启动后台线程,负责log的创建,检索及清理
     3     */
     4   def startup() {
     5     /* Schedule the cleanup task to delete old logs */
     6     if (scheduler != null) {
     7       info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
     8       scheduler.schedule("kafka-log-retention",
     9         cleanupLogs,
    10         delay = InitialTaskDelayMs,
    11         period = retentionCheckMs,
    12         TimeUnit.MILLISECONDS)
    13       info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
    14       scheduler.schedule("kafka-log-flusher",
    15         flushDirtyLogs,
    16         delay = InitialTaskDelayMs,
    17         period = flushCheckMs,
    18         TimeUnit.MILLISECONDS)
    19       scheduler.schedule("kafka-recovery-point-checkpoint",
    20         checkpointRecoveryPointOffsets,
    21         delay = InitialTaskDelayMs,
    22         period = flushCheckpointMs,
    23         TimeUnit.MILLISECONDS)
    24       scheduler.schedule("kafka-delete-logs",
    25         deleteLogs,
    26         delay = InitialTaskDelayMs,
    27         period = defaultConfig.fileDeleteDelayMs,
    28         TimeUnit.MILLISECONDS)
    29     }
    30     if (cleanerConfig.enableCleaner)
    31       cleaner.startup()
    32   }

    3.4 SocketServer

    1 /**
    2   * SocketServer是socket服务器,
    3   * 线程模型是:1个Acceptor线程处理新连接,Acceptor还有多个处理器线程,每个处理器线程拥有自己的选择器和多个读socket请求Handler线程。
    4   * handler线程处理请求并产生响应写给处理器线程
    5   */
    6 class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {

    3.5 复制管理器ReplicaManager

    启动ISR线程

    1 def startup() {
    2     // 启动ISR过期线程
    3     // 一个follower可以在配置上落后于leader。在它被从ISR中移除之前,复制
    4     scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
    5     scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS)
    6   }

    3.6 kafka控制器KafkaController

    当kafka 服务器的控制器模块启动时激活

    1 def startup() = {
    2     inLock(controllerContext.controllerLock) {
    3       info("Controller starting up")
    4       registerSessionExpirationListener()
    5       isRunning = true
    6       controllerElector.startup
    7       info("Controller startup complete")
    8     }
    9   }

    session过期监听器注册:

     1 private def registerSessionExpirationListener() = {
     2     zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener())
     3   }
     4     public void subscribeStateChanges(final IZkStateListener listener) {
     5         synchronized (_stateListener) {
     6             _stateListener.add(listener);
     7         }
     8     }
     9 
    10 
    11  class SessionExpirationListener() extends IZkStateListener with Logging {
    12       this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
    13       @throws(classOf[Exception])
    14       def handleStateChanged(state: KeeperState) {
    15         // do nothing, since zkclient will do reconnect for us.
    16     }

    选主过程:

    def startup {
        inLock(controllerContext.controllerLock) {
          controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
          elect
        }
      }
    
    def elect: Boolean = {
        val timestamp = SystemTime.milliseconds.toString
        val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
       
       leaderId = getControllerID 
        /* 
         * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, 
         * it's possible that the controller has already been elected when we get here. This check will prevent the following 
         * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
         */
        if(leaderId != -1) {
           debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
           return amILeader
        }
    
        try {
          val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath,
                                                          electString,
                                                          controllerContext.zkUtils.zkConnection.getZookeeper,
                                                          JaasUtils.isZkSecurityEnabled())
          zkCheckedEphemeral.create()
          info(brokerId + " successfully elected as leader")
          leaderId = brokerId
          onBecomingLeader()
        } catch {
          case e: ZkNodeExistsException =>
            // If someone else has written the path, then
            leaderId = getControllerID 
    
            if (leaderId != -1)
              debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
            else
              warn("A leader has been elected but just resigned, this will result in another round of election")
    
          case e2: Throwable =>
            error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
            resign()
        }
        amILeader
      }
    
     def amILeader : Boolean = leaderId == brokerId

    3.7 GroupCoordinator

    GroupCoordinator处理组成员管理和offset管理,每个kafka服务器初始化一个协作器来负责一系列组别。每组基于它们的组名来赋予协作器。

    1 def startup() {
    2     info("Starting up.")
    3     heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId)
    4     joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", brokerId)
    5     isActive.set(true)
    6     info("Startup complete.")
    7   }

    注意:若同时需要一个组锁和元数据锁,请务必保证先获取组锁,然后获取元数据锁来防止死锁。

    3.8 KafkaApis消息处理接口

     1 /**
     2    * Top-level method that handles all requests and multiplexes to the right api
     3    */
     4   def handle(request: RequestChannel.Request) {
     5     try{
     6       trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
     7         format(request.requestObj, request.connectionId, request.securityProtocol, request.session.principal))
     8       request.requestId match {
     9         case RequestKeys.ProduceKey => handleProducerRequest(request)
    10         case RequestKeys.FetchKey => handleFetchRequest(request)
    11         case RequestKeys.OffsetsKey => handleOffsetRequest(request)
    12         case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
    13         case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
    14         case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
    15         case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
    16         case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
    17         case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
    18         case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
    19         case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)
    20         case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
    21         case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
    22         case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)
    23         case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request)
    24         case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request)
    25         case RequestKeys.ListGroupsKey => handleListGroupsRequest(request)
    26         case requestId => throw new KafkaException("Unknown api code " + requestId)
    27       }
    28     } catch {
    29       case e: Throwable =>
    30         if ( request.requestObj != null)
    31           request.requestObj.handleError(e, requestChannel, request)
    32         else {
    33           val response = request.body.getErrorResponse(request.header.apiVersion, e)
    34           val respHeader = new ResponseHeader(request.header.correlationId)
    35 
    36           /* If request doesn't have a default error response, we just close the connection.
    37              For example, when produce request has acks set to 0 */
    38           if (response == null)
    39             requestChannel.closeConnection(request.processor, request)
    40           else
    41             requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response)))
    42         }
    43         error("error when handling request %s".format(request.requestObj), e)
    44     } finally
    45       request.apiLocalCompleteTimeMs = SystemTime.milliseconds
    46   }

    我们以处理消费者请求为例:

     1 /**
     2    * Handle a produce request
     3    */
     4   def handleProducerRequest(request: RequestChannel.Request) {
     5     val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
     6     val numBytesAppended = produceRequest.sizeInBytes
     7 
     8     val (authorizedRequestInfo, unauthorizedRequestInfo) =  produceRequest.data.partition  {
     9       case (topicAndPartition, _) => authorize(request.session, Write, new Resource(Topic, topicAndPartition.topic))
    10     }
    11 
    12     // the callback for sending a produce response
    13     def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
    14 
    15       val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.TopicAuthorizationCode, -1))
    16 
    17       var errorInResponse = false
    18 
    19       mergedResponseStatus.foreach { case (topicAndPartition, status) =>
    20         if (status.error != ErrorMapping.NoError) {
    21           errorInResponse = true
    22           debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
    23             produceRequest.correlationId,
    24             produceRequest.clientId,
    25             topicAndPartition,
    26             ErrorMapping.exceptionNameFor(status.error)))
    27         }
    28       }
    29 
    30       def produceResponseCallback(delayTimeMs: Int) {
    31 
    32         if (produceRequest.requiredAcks == 0) {
    33           // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
    34           // the request, since no response is expected by the producer, the server will close socket server so that
    35           // the producer client will know that some error has happened and will refresh its metadata
    36           if (errorInResponse) {
    37             val exceptionsSummary = mergedResponseStatus.map { case (topicAndPartition, status) =>
    38               topicAndPartition -> ErrorMapping.exceptionNameFor(status.error)
    39             }.mkString(", ")
    40             info(
    41               s"Closing connection due to error during produce request with correlation id ${produceRequest.correlationId} " +
    42                 s"from client id ${produceRequest.clientId} with ack=0
    " +
    43                 s"Topic and partition to exceptions: $exceptionsSummary"
    44             )
    45             requestChannel.closeConnection(request.processor, request)
    46           } else {
    47             requestChannel.noOperation(request.processor, request)
    48           }
    49         } else {
    50           val response = ProducerResponse(produceRequest.correlationId,
    51                                           mergedResponseStatus,
    52                                           produceRequest.versionId,
    53                                           delayTimeMs)
    54           requestChannel.sendResponse(new RequestChannel.Response(request,
    55                                                                   new RequestOrResponseSend(request.connectionId,
    56                                                                                             response)))
    57         }
    58       }
    59 
    60       // When this callback is triggered, the remote API call has completed
    61       request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
    62 
    63       quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId,
    64                                                                    numBytesAppended,
    65                                                                    produceResponseCallback)
    66     }
    67 
    68     if (authorizedRequestInfo.isEmpty)
    69       sendResponseCallback(Map.empty)
    70     else {
    71       val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId
    72 
    73       // call the replica manager to append messages to the replicas
    74       replicaManager.appendMessages(
    75         produceRequest.ackTimeoutMs.toLong,
    76         produceRequest.requiredAcks,
    77         internalTopicsAllowed,
    78         authorizedRequestInfo,
    79         sendResponseCallback)
    80 
    81       // if the request is put into the purgatory, it will have a held reference
    82       // and hence cannot be garbage collected; hence we clear its data here in
    83       // order to let GC re-claim its memory since it is already appended to log
    84       produceRequest.emptyData()
    85     }
    86   }

     

    3.9 动态配置管理DynamicConfigManager

    利用zookeeper做动态配置中心

     1 /**
     2    * Begin watching for config changes
     3    */
     4   def startup() {
     5     zkUtils.makeSurePersistentPathExists(ZkUtils.EntityConfigChangesPath)
     6     zkUtils.zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener)
     7     processAllConfigChanges()
     8   }
     9 
    10   /**
    11    * Process all config changes
    12    */
    13   private def processAllConfigChanges() {
    14     val configChanges = zkUtils.zkClient.getChildren(ZkUtils.EntityConfigChangesPath)
    15     import JavaConversions._
    16     processConfigChanges((configChanges: mutable.Buffer[String]).sorted)
    17   }
    18 
    19   /**
    20    * Process the given list of config changes
    21    */
    22   private def processConfigChanges(notifications: Seq[String]) {
    23     if (notifications.size > 0) {
    24       info("Processing config change notification(s)...")
    25       val now = time.milliseconds
    26       for (notification <- notifications) {
    27         val changeId = changeNumber(notification)
    28 
    29         if (changeId > lastExecutedChange) {
    30           val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification
    31 
    32           val (jsonOpt, stat) = zkUtils.readDataMaybeNull(changeZnode)
    33           processNotification(jsonOpt)
    34         }
    35         lastExecutedChange = changeId
    36       }
    37       purgeObsoleteNotifications(now, notifications)
    38     }
    39   }

    3.10 心跳检测KafkaHealthcheck

    心跳检测也使用zookeeper维持:

     1 def startup() {
     2     zkUtils.zkClient.subscribeStateChanges(sessionExpireListener)
     3     register()
     4   }
     5 
     6   /**
     7    * Register this broker as "alive" in zookeeper
     8    */
     9   def register() {
    10     val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
    11     val updatedEndpoints = advertisedEndpoints.mapValues(endpoint =>
    12       if (endpoint.host == null || endpoint.host.trim.isEmpty)
    13         EndPoint(InetAddress.getLocalHost.getCanonicalHostName, endpoint.port, endpoint.protocolType)
    14       else
    15         endpoint
    16     )
    17 
    18     // the default host and port are here for compatibility with older client
    19     // only PLAINTEXT is supported as default
    20     // if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connect
    21     val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null))
    22     zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort)
    23   }

    4 小结

    kafka中KafkaServer类,是网络处理,io处理等的入口.

    ReplicaManager    副本管理

    KafkaApis    处理所有request的Proxy类,根据requestKey决定调用具体的handler

    KafkaRequestHandlerPool 处理request的线程池,请求处理池

    LogManager    kafka文件存储系统管理,负责处理和存储所有Kafka的topic的partiton数据

    TopicConfigManager  监听此zk节点的⼦子节点/config/changes/,通过LogManager更新topic的配置信息,topic粒度配置管理

    KafkaHealthcheck 监听zk session expire,在zk上创建broker信息,便于其他broker和consumer获取其信息

    KafkaController  kafka集群中央控制器选举,leader选举,副本分配。

    KafkaScheduler  负责副本管理和日志管理调度等等

    ZkClient         负责注册zk相关信息.

    BrokerTopicStats  topic信息统计和监控

    ControllerStats          中央控制器统计和监控

  • 相关阅读:
    Codeforces 877 C. Slava and tanks
    Codeforces 877 D. Olya and Energy Drinks
    2017 10.25 NOIP模拟赛
    2017 国庆湖南 Day1
    UVA 12113 Overlapping Squares
    学大伟业 国庆Day2
    51nod 1629 B君的圆锥
    51nod 1381 硬币游戏
    [JSOI2010]满汉全席
    学大伟业 2017 国庆 Day1
  • 原文地址:https://www.cnblogs.com/zcjcsl/p/8619484.html
Copyright © 2011-2022 走看看