本文主要针对于Kafka的源码进行分析,版本为kafka-0.8.2.1。 由于时间有限,可能更新比较慢...
Kafka.scala
// 读取配置文件 val props = Utils.loadProps(args(0)) val serverConfig = new KafkaConfig(props) KafkaMetricsReporter.startReporters(serverConfig.props) val kafkaServerStartable = new KafkaServerStartable(serverConfig) // 注册一个关闭钩子,当JVM关闭时调用KafkaServerStartable.shutdown Runtime.getRuntime().addShutdownHook(new Thread() { override def run() = kafkaServerStartable.shutdown }) // 运行并等待结束 kafkaServerStartable.startup kafkaServerStartable.awaitShutdown
Server
实际调用类为KafkaServer
def startup() { kafkaScheduler.startup() // 初始化Zookeeper内相关路径 zkClient = initZk() // 日志管理器 logManager = createLogManager(zkClient, brokerState) logManager.startup() socketServer = new SocketServer(...) socketServer.startup() // 启动副本管理器 replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) // 创建偏移量管理器 offsetManager = createOffsetManager() // 实例化调度器 kafkaController = new KafkaController(config, zkClient, brokerState) // 请求处理器 apis = new KafkaApis(...) // 网络请求处理 requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) Mx4jLoader.maybeLoad() replicaManager.startup() kafkaController.startup() // Topic配置管理器 topicConfigManager = new TopicConfigManager(zkClient, logManager) topicConfigManager.startup() // Broker的心跳检查 kafkaHealthcheck = new KafkaHealthcheck(...) kafkaHealthcheck.startup() registerStats() startupComplete.set(true) info("started") }
在KafkaServer的startup中看到主要进行几个主要服务的初始化和启动。
private def initZk(): ZkClient = { info("Connecting to zookeeper on " + config.zkConnect) // Kafka在Zookeeper中的工作根目录 val chroot = { if (config.zkConnect.indexOf("/") > 0) config.zkConnect.substring(config.zkConnect.indexOf("/")) else "" } // 创建工作根目录 if (chroot.length > 1) { val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/")) val zkClientForChrootCreation = new ZkClient(...) ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot) info("Created zookeeper path " + chroot) zkClientForChrootCreation.close() } // 实例化ZkClient val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) // 在Zookeeper中创建必要持久路径 ZkUtils.setupCommonPaths(zkClient) zkClient }
KafkaScheduler实际为对线程池ScheduledThreadPoolExecutor的封装,这里不做过多的分析。
KafkaHealthcheck(...) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId val sessionExpireListener = new SessionExpireListener def startup() { // 注册一个Zookeeper事件(状态)监听器 zkClient.subscribeStateChanges(sessionExpireListener) // 在Zookeeper的/brokers/ids/id目录创建临时节点并写入节点信息 register() } }
IZkStateListener 定义了两种事件:一种是连接状态的改变,例如由未连接改变成连接上,连接上改为过期等;
另一种创建一个新的session(连接), 通常是由于session失效然后新的session被建立时触发。
class SessionExpireListener() extends IZkStateListener { @throws(classOf[Exception]) def handleStateChanged(state: KeeperState) {} @throws(classOf[Exception]) def handleNewSession() = register() }
ReplicaManager
def startup() { scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS) } // 定时调用maybeShrinkIsr private def maybeShrinkIsr(): Unit = { trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages)) }
这里调用了cluster.Partition中的maybeShrinkIsr来将卡住的或者低效的副本从ISR中去除并更新HighWatermark。
def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) { inWriteLock(leaderIsrUpdateLock) { leaderReplicaIfLocal() match { case Some(leaderReplica) => // 找出卡住和低效的Replica并从ISR中去除 val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages) if(outOfSyncReplicas.size > 0) { val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas assert(newInSyncReplicas.size > 0) // 更新ZK中的ISR updateIsr(newInSyncReplicas) // 计算HW并更新 maybeIncrementLeaderHW(leaderReplica) replicaManager.isrShrinkRate.mark() } ... }
def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages: Long): Set[Replica] = { // Leader的最后写入偏移量 val leaderLogEndOffset = leaderReplica.logEndOffset // ISR中排除LeaderReplica的其他集合 val candidateReplicas = inSyncReplicas - leaderReplica // 卡住的Replica集合 val stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs) > keepInSyncTimeMs) // 低效的Replica // 条件1 Replicas的offset > 0 // 条件2 Leader的offset - Replicas的offset > 阀值 val slowReplicas = candidateReplicas.filter(r => r.logEndOffset.messageOffset >= 0 && leaderLogEndOffset.messageOffset - r.logEndOffset.messageOffset > keepInSyncMessages) // 返回卡住的和低效的Replicas stuckReplicas ++ slowReplicas }
Cluster
Controller