zoukankan      html  css  js  c++  java
  • Kafka源码分析

     

    本文主要针对于Kafka的源码进行分析,版本为kafka-0.8.2.1。 由于时间有限,可能更新比较慢...

    Kafka.scala

    // 读取配置文件
    val props = Utils.loadProps(args(0))
    val serverConfig = new KafkaConfig(props)
    KafkaMetricsReporter.startReporters(serverConfig.props)
    
    val kafkaServerStartable = new KafkaServerStartable(serverConfig)
    
    // 注册一个关闭钩子,当JVM关闭时调用KafkaServerStartable.shutdown 
    Runtime.getRuntime().addShutdownHook(new Thread() {
      override def run() = kafkaServerStartable.shutdown
    })  
    
    // 运行并等待结束
    kafkaServerStartable.startup
    kafkaServerStartable.awaitShutdown
    

      

    Server

            实际调用类为KafkaServer

    def startup() {
      kafkaScheduler.startup()
    
      // 初始化Zookeeper内相关路径
      zkClient = initZk()
    
      // 日志管理器
      logManager = createLogManager(zkClient, brokerState)
      logManager.startup()
    
      socketServer = new SocketServer(...)
      socketServer.startup()
    
      // 启动副本管理器
      replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
    
      // 创建偏移量管理器
      offsetManager = createOffsetManager()
    
      // 实例化调度器
      kafkaController = new KafkaController(config, zkClient, brokerState)
    
      // 请求处理器
      apis = new KafkaApis(...)
    
      // 网络请求处理
      requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
      brokerState.newState(RunningAsBroker)
    
      Mx4jLoader.maybeLoad()
      replicaManager.startup()
      kafkaController.startup()
    
      // Topic配置管理器
      topicConfigManager = new TopicConfigManager(zkClient, logManager)
      topicConfigManager.startup()
    
      // Broker的心跳检查
      kafkaHealthcheck = new KafkaHealthcheck(...)
      kafkaHealthcheck.startup()
    
      registerStats()
      startupComplete.set(true)
      info("started")
    }

            在KafkaServer的startup中看到主要进行几个主要服务的初始化和启动。

    private def initZk(): ZkClient =
    {
      info("Connecting to zookeeper on " + config.zkConnect)
       
      // Kafka在Zookeeper中的工作根目录
      val chroot = {
        if (config.zkConnect.indexOf("/") > 0)
          config.zkConnect.substring(config.zkConnect.indexOf("/"))
        else
          ""
      }
      // 创建工作根目录
      if (chroot.length > 1) {
        val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/"))
        val zkClientForChrootCreation = new ZkClient(...)
        ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot)
        info("Created zookeeper path " + chroot)
        zkClientForChrootCreation.close()
      }
     
      // 实例化ZkClient
      val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
      // 在Zookeeper中创建必要持久路径
      ZkUtils.setupCommonPaths(zkClient)
      zkClient
    }
    

      

            KafkaScheduler实际为对线程池ScheduledThreadPoolExecutor的封装,这里不做过多的分析。

    KafkaHealthcheck(...)
    {
      val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
      val sessionExpireListener = new SessionExpireListener
    
      def startup() 
      {
        // 注册一个Zookeeper事件(状态)监听器
        zkClient.subscribeStateChanges(sessionExpireListener)
        // 在Zookeeper的/brokers/ids/id目录创建临时节点并写入节点信息   
        register()
      }
    }

           

            IZkStateListener 定义了两种事件:一种是连接状态的改变,例如由未连接改变成连接上,连接上改为过期等;

            另一种创建一个新的session(连接), 通常是由于session失效然后新的session被建立时触发。

    class SessionExpireListener() extends IZkStateListener 
    {
      @throws(classOf[Exception])
      def handleStateChanged(state: KeeperState) {}
    
      @throws(classOf[Exception])
      def handleNewSession() = register()
    }
    

      

     ReplicaManager

    def startup() 
    {
      scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS)
    }
    // 定时调用maybeShrinkIsr
    private def maybeShrinkIsr(): Unit = 
    {
      trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
      allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages))
    }
    

      

    这里调用了cluster.Partition中的maybeShrinkIsr来将卡住的或者低效的副本从ISR中去除并更新HighWatermark。

    def maybeShrinkIsr(replicaMaxLagTimeMs: Long,  replicaMaxLagMessages: Long) 
    {                        
      inWriteLock(leaderIsrUpdateLock) {                                                                 
        leaderReplicaIfLocal() match {                                                                   
          case Some(leaderReplica) =>                                                                    
          // 找出卡住和低效的Replica并从ISR中去除
            val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages)   
            if(outOfSyncReplicas.size > 0) {
              val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas                                
              assert(newInSyncReplicas.size > 0)
              // 更新ZK中的ISR                                                       
              updateIsr(newInSyncReplicas)  
              // 计算HW并更新
              maybeIncrementLeaderHW(leaderReplica)
              replicaManager.isrShrinkRate.mark()                                                        
            }                                          
      ...
    }
    
    def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages: Long): Set[Replica] = 
    {
      // Leader的最后写入偏移量
      val leaderLogEndOffset = leaderReplica.logEndOffset
      // ISR中排除LeaderReplica的其他集合
      val candidateReplicas = inSyncReplicas - leaderReplica
      // 卡住的Replica集合
      val stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs) > keepInSyncTimeMs)
      // 低效的Replica
      // 条件1 Replicas的offset > 0
      // 条件2 Leader的offset - Replicas的offset > 阀值
      val slowReplicas = candidateReplicas.filter(r =>
        r.logEndOffset.messageOffset >= 0 &&
        leaderLogEndOffset.messageOffset - r.logEndOffset.messageOffset > keepInSyncMessages)
      // 返回卡住的和低效的Replicas
      stuckReplicas ++ slowReplicas
    }
    

      

    Cluster

    Controller

     

  • 相关阅读:
    升讯威微信营销系统开发实践:目录
    升讯威微信营销系统开发实践:订阅号和服务号深入分析( 完整开源于 Github)
    ASP.NET MVC (Razor)开发<<周报与绩效考核系统>>,并免费提供园友们使用~~~
    使用 SailingEase WinForm 框架构建复合式应用程序(插件式应用程序)
    vertica提取json字段值
    centos上配置redis从节点
    查看出网IP
    centos上tcp抓包
    修改centos服务器时区并同步最新时间
    解决centos下tomcat启动太慢 & JDBC连接oracle太慢的问题
  • 原文地址:https://www.cnblogs.com/rilley/p/5391539.html
Copyright © 2011-2022 走看看