zoukankan      html  css  js  c++  java
  • MetadataCache更新

    MetadataCache什么时候更新

    updateCache方法用来更新缓存的。

    发起线程 controller-event-thread

    controller选举的时候

    CLASS_NAME METHOD_NAME LINE_NUM
    kafka/controller/KafkaController sendUpdateMetadataRequest 1043
    kafka/controller/KafkaController onControllerFailover 288
    kafka/controller/KafkaController elect 1658
    kafka/controller/KafkaController$Startup$ process 1581
    kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply$mcV$sp 53
    kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
    kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
    kafka/metrics/KafkaTimer time 32
    kafka/controller/ControllerEventManager$ControllerEventThread doWork 64
    kafka/utils/ShutdownableThread run 70

    启动的时候选举,启动这个动作也是个事件

    
    // KafkaController.scala
      case object Startup extends ControllerEvent {
    
        def state = ControllerState.ControllerChange
    
        override def process(): Unit = {
          registerSessionExpirationListener()
          registerControllerChangeListener()
          elect()
        }
    
      }
    

    broker启动的时候

    CLASS_NAME METHOD_NAME LINE_NUM
    kafka/controller/KafkaController sendUpdateMetadataRequest 1043
    kafka/controller/KafkaController onBrokerStartup 387
    kafka/controller/KafkaController$BrokerChange process 1208
    kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply$mcV$sp 53
    kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
    kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
    kafka/metrics/KafkaTimer time 32
    kafka/controller/ControllerEventManager$ControllerEventThread doWork 64
    kafka/utils/ShutdownableThread run 70

    topic删除的时候

    CLASS_NAME METHOD_NAME LINE_NUM
    kafka/controller/KafkaController sendUpdateMetadataRequest 1043
    kafka/controller/TopicDeletionManager kafka$controller$TopicDeletionManager$$onTopicDeletion 268
    kafka/controller/TopicDeletionManager$$anonfun$resumeDeletions$2 apply 333
    kafka/controller/TopicDeletionManager$$anonfun$resumeDeletions$2 apply 333
    scala/collection/immutable/Set$Set1 foreach 94
    kafka/controller/TopicDeletionManager resumeDeletions 333
    kafka/controller/TopicDeletionManager enqueueTopicsForDeletion 110
    kafka/controller/KafkaController$TopicDeletion process 1280
    kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply$mcV$sp 53
    kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
    kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
    kafka/metrics/KafkaTimer time 32
    kafka/controller/ControllerEventManager$ControllerEventThread doWork 64
    kafka/utils/ShutdownableThread run 70

    topic创建或者修改的时候

    CLASS_NAME METHOD_NAME LINE_NUM
    kafka/controller/ControllerBrokerRequestBatch updateMetadataRequestBrokerSet 291
    kafka/controller/ControllerBrokerRequestBatch newBatch 294
    kafka/controller/PartitionStateMachine handleStateChanges 105
    kafka/controller/KafkaController onNewPartitionCreation 499
    kafka/controller/KafkaController onNewTopicCreation 485
    kafka/controller/KafkaController$TopicChange process 1237
    kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply$mcV$sp 53
    kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
    kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
    kafka/metrics/KafkaTimer time 32
    kafka/controller/ControllerEventManager$ControllerEventThread doWork 64
    kafka/utils/ShutdownableThread run 70

    topic创建这个是从队列中拿到事件再处理的方式
    队列是kafka.controller.ControllerEventManager.queue
    放入过程如下,本质还是监听zk的path的child的变化:

    CLASS_NAME METHOD_NAME LINE_NUM
    kafka/controller/ControllerEventManagerput 44
    kafka/controller/TopicChangeListener handleChildChange 1712
    org/I0Itec/zkclient/ZkClient$10 run 848
    org/I0Itec/zkclient/ZkEventThread run 85

    注册监听器的代码如下:

    // class KafkaController
      private def registerTopicChangeListener() = {
        zkUtils.subscribeChildChanges(BrokerTopicsPath, topicChangeListener)
      }
    

    顺带说一下有6个地方订阅了zk的子节点的变化:

    • DynamicConfigManager.startup
    • registerTopicChangeListener
    • registerIsrChangeNotificationListener
    • registerTopicDeletionListener
    • registerBrokerChangeListener
    • registerLogDirEventNotificationListener

    处理创建topic事件:

    // ControllerChannelManager.scala  class ControllerBrokerRequestBatch
      def sendRequestsToBrokers(controllerEpoch: Int) {
      // .......
          val updateMetadataRequest = {
            val liveBrokers = if (updateMetadataRequestVersion == 0) {
              // .......
            } else {
              controllerContext.liveOrShuttingDownBrokers.map { broker =>
                val endPoints = broker.endPoints.map { endPoint =>
                  new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port, endPoint.securityProtocol, endPoint.listenerName)
                }
                new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
              }
            }
            new UpdateMetadataRequest.Builder(updateMetadataRequestVersion, controllerId, controllerEpoch, partitionStates.asJava,
              liveBrokers.asJava)
          }
          updateMetadataRequestBrokerSet.foreach { broker =>
            controller.sendRequest(broker, ApiKeys.UPDATE_METADATA, updateMetadataRequest, null)
          }
          // .......
        }
    

    topic创建时更新metadata再进一步的过程
    构建发送请求事件放入发送队列等待发送线程发送
    构建发送请求事件代码如下:

    // ControllerChannelManager
      def sendRequest(brokerId: Int, apiKey: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest],
                      callback: AbstractResponse => Unit = null) {
        brokerLock synchronized {
          val stateInfoOpt = brokerStateInfo.get(brokerId)
          stateInfoOpt match {
            case Some(stateInfo) =>
              stateInfo.messageQueue.put(QueueItem(apiKey, request, callback))
            case None =>
              warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId))
          }
        }
      }
    

    调用栈:

    CLASS_NAMEMETHOD_NAMELINE_NUM
    kafka/controller/ControllerChannelManagersendRequest81
    kafka/controller/KafkaControllersendRequest662
    kafka/controller/ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2apply405
    kafka/controller/ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2apply405
    scala/collection/mutable/HashMap$$anonfun$foreach$1apply130
    scala/collection/mutable/HashMap$$anonfun$foreach$1apply130
    scala/collection/mutable/HashTable$classforeachEntry241
    scala/collection/mutable/HashMapforeachEntry40
    scala/collection/mutable/HashMapforeach130
    kafka/controller/ControllerBrokerRequestBatchsendRequestsToBrokers502
    kafka/controller/PartitionStateMachinehandleStateChanges105
    kafka/controller/KafkaControlleronNewPartitionCreation499
    kafka/controller/KafkaControlleronNewTopicCreation485
    kafka/controller/KafkaController$TopicChangeprocess1237
    kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1apply$mcV$sp53
    kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1apply53
    kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1apply53
    kafka/metrics/KafkaTimertime32
    kafka/controller/ControllerEventManager$ControllerEventThreaddoWork64
    kafka/utils/ShutdownableThreadrun70

    发送线程发送请求:
    代码如下:

    // ControllerChannelManager.scala class RequestSendThread
      override def doWork(): Unit = {
    
        def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(100))
    
        val QueueItem(apiKey, requestBuilder, callback) = queue.take()
        //...
        while (isRunning.get() && !isSendSuccessful) {
            // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
            // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
            try {
              if (!brokerReady()) {
                isSendSuccessful = false
                backoff()
              }
              else {
                val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder,
                  time.milliseconds(), true)
                clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
                isSendSuccessful = true
              }
            } catch {
              case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
                warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
                  "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
                    requestBuilder.toString, brokerNode.toString), e)
                networkClient.close(brokerNode.idString)
                isSendSuccessful = false
                backoff()
            }
          }
          // ......
      }
    
    

    响应线程

    CLASS_NAMEMETHOD_NAMELINE_NUM
    kafka/server/MetadataCachekafka$server$MetadataCache$$addOrUpdatePartitionInfo150
    kafka/utils/CoreUtils$inLock219
    kafka/utils/CoreUtils$inWriteLock225
    kafka/server/MetadataCacheupdateCache184
    kafka/server/ReplicaManagermaybeUpdateMetadataCache988
    kafka/server/KafkaApishandleUpdateMetadataRequest212
    kafka/server/KafkaApishandle142
    kafka/server/KafkaRequestHandlerrun72

    线程信息: kafka-request-handler-5
    partitionMetadataLock读写锁控制cache数据的读取与写入的线程安全。元数据信息在发送请求中已经构造好了。此处还涉live broker的更新等。

    应该还要补充:leader切换和isr变化等

  • 相关阅读:
    jQuery1.11源码分析(6)-----jQuery结构总揽
    保存知乎收藏夹功能的NodeJS版本
    浏览器兼容性小整理和一些js小问题(后面会继续更新)
    jQuery1.11源码分析(5)-----Sizzle编译和过滤阶段[原创]
    jQuery1.11源码分析(4)-----Sizzle工厂函数[原创]
    【原创】Redux 卍解
    【原创】移动端高清、多屏适配方案
    【原创】ui.router源码解析
    【原创】ReFlux细说
    【原创】express3.4.8源码解析之中间件
  • 原文地址:https://www.cnblogs.com/simoncook/p/11809452.html
Copyright © 2011-2022 走看看