zoukankan      html  css  js  c++  java
  • 【原创】大叔问题定位分享(4)Kafka集群broker节点从zookeeper上消失

    kafka_2.8.0-0.8.1

    一 现象

    生产环境一组kafka集群经常发生问题,现象是kafka在zookeeper上的broker节点消失,此时kafka进程和端口都在,然后每个broker都在报错,主要是

    1)

    [2017-01-09 12:40:53,832] INFO Partition [topic1,3] on broker 1361: Shrinking ISR for partition [topic1,3] from 1351,1361,1341 to 1361 (kafka.cluster.Partition)

    2)

    [2017-01-09 12:33:53,858] ERROR Conditional update of path /brokers/topics/topic2/partitions/0/state with data {"controller_epoch":20,"leader":1361,"version":1,"leader_epoch":13,"isr":[1361]} and expected version 23 failed due to org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/topic2/partitions/0/state (kafka.utils.ZkUtils$)

    同时客户端也在报错;

    二 诊断

    最近的一次问题发生在2017年1月9号中午,3台broker(134/135/136)先后从zookeeper上消失,最后服务不可用,根据日志整理过程如下:

    136 is controller

    2017-01-09 12:33:33 134 zk disconnect

    2017-01-09 12:33:33 134 zk connect

    2017-01-09 12:33:36 135 zk disconnect

    2017-01-09 12:33:36 136 zk disconnect[dispear]

    2017-01-09 12:33:36 134 become controller

    2017-01-09 12:33:37 135 zk connect

    2017-01-09 12:33:42 134 zk disconnect[dispear]

    2017-01-09 12:33:44 135 become controller

    135 zk disconnect[dispear]

    134 restart

    2017-01-09 12:37:32 134 zk connect

    2017-01-09 12:37:32 134 become controller

    2017-01-09 12:37:39 134 zk disconnect[dispear]

    135 restart

    2017-01-09 12:38:14 135 zk connect[ok]

    2017-01-09 12:38:14 135 become controller[ok]

    134 restart

    2017-01-09 12:39:41 134 zk connect[ok]

    136 restart

    2017-01-09 12:41:19 136 zk connect[ok]

    在12点33分-12点37分服务不可用期间,3台broker的jstack都有相同的两个堆栈:

    1)

    "delete-topics-thread" prio=10 tid=0x00007f52ec06c800 nid=0x18b8 waiting on condition [0x00007f52b59dd000]

       java.lang.Thread.State: WAITING (parking)

            at sun.misc.Unsafe.park(Native Method)

            - parking to wait for  <0x00000000c55a2140> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)

            at java.util.concurrent.locks.LockSupport.park(Unknown Source)

            at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(Unknown Source)

            at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(Unknown Source)

            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source)

            at kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$awaitTopicDeletionNotification(TopicDeletionManager.scala:178)

            at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:334)

            at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333)

            at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333)

            at kafka.utils.Utils$.inLock(Utils.scala:538)

            at kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:333)

            at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

    2)

    "ZkClient-EventThread-12-kafka-common1.hangzhou-1.zookeeper.internal.lede.com:2182,kafka-common2.hangzhou-1.zookeeper.internal.lede.com:2182,kafka-common3.hangzhou-1.zookeeper.internal.lede.com:2182,kafka-common4.hangzhou-1.zookeeper.internal.lede.com:2182,kafka-common5.hangzhou-1.zookeeper.internal.lede.com:2182" daemon prio=10 tid=0x00007f533435f800 nid=0xbb4 waiting on condition [0x00007f531a8be000]

       java.lang.Thread.State: WAITING (parking)

            at sun.misc.Unsafe.park(Native Method)

            - parking to wait for  <0x00000000e41c5cf8> (a java.util.concurrent.CountDownLatch$Sync)

            at java.util.concurrent.locks.LockSupport.park(Unknown Source)

            at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(Unknown Source)

            at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(Unknown Source)

            at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)

            at java.util.concurrent.CountDownLatch.await(Unknown Source)

            at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)

            at kafka.controller.TopicDeletionManager.shutdown(TopicDeletionManager.scala:93)

            at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:340)

            at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)

            at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)

            at kafka.utils.Utils$.inLock(Utils.scala:538)

            at kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337)

            at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068)

            at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)

            at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)

            at kafka.utils.Utils$.inLock(Utils.scala:538)

            at kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067)

            at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)

            at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

    先看第二个堆栈,Kafka controller在集群中只有一个,可以通过zk上的/controller查看当前controller,controller会启动一个DeleteTopicsThread,同时会注册一个SessionExpirationListener,当与zookeeper的连接断开重连之后会回调handleNewSession(如下),主要工作是清理当前的controller状态并重新elect:

        /**
    
         * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
    
         * any ephemeral nodes here.
    
         *
    
         * @throws Exception
    
         *             On any error.
    
         */
    
        @throws(classOf[Exception])
    
        def handleNewSession() {
    
          info("ZK expired; shut down all controller components and try to re-elect")
    
          inLock(controllerContext.controllerLock) {
    
            onControllerResignation()
    
            controllerElector.elect
    
          }
    
        }
    
      }

    其中onControllerResignation(如下):

      /**
    
       * This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is
    
       * required to clean up internal controller data structures
    
       */
    
      def onControllerResignation() {
    
        inLock(controllerContext.controllerLock) {
    
          if (config.autoLeaderRebalanceEnable)
    
            autoRebalanceScheduler.shutdown()
    
          deleteTopicManager.shutdown()
    
          Utils.unregisterMBean(KafkaController.MBeanName)
    
          partitionStateMachine.shutdown()
    
          replicaStateMachine.shutdown()
    
          if(controllerContext.controllerChannelManager != null) {
    
            controllerContext.controllerChannelManager.shutdown()
    
            controllerContext.controllerChannelManager = null
    
          }
    
        }
    
      }

    其中shutdown(如下):

      /**
    
       * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared
    
       */
    
      def shutdown() {
    
        deleteTopicsThread.shutdown()
    
        topicsToBeDeleted.clear()
    
        topicsIneligibleForDeletion.clear()
    
      }

    其中shutdown(如下):

      def shutdown(): Unit = {
    
        info("Shutting down")
    
        isRunning.set(false)
    
        if (isInterruptible)
    
          interrupt()
    
        shutdownLatch.await()
    
        info("Shutdown completed")
    
      }

    第二个堆栈中线程就是卡在这一行,除了这刚才说到的SessionExpirationListener,kafka在启动时还会启动一个KafkaHealthcheck:

        kafkaController = new KafkaController(config, zkClient)
    
       
    
        /* start processing requests */
    
        apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)
    
        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
    
      
    
        Mx4jLoader.maybeLoad()
    
     
    
        replicaManager.startup()
    
        kafkaController.startup()
    
       
    
        topicConfigManager = new TopicConfigManager(zkClient, logManager)
    
        topicConfigManager.startup()
    
       
    
        /* tell everyone we are alive */
    
        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
    
        kafkaHealthcheck.startup()

    其中也会注册SessionExpireListener(如下),这里会在zk上/brokers/ids下注册broker结点:

       /**
    
         * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
    
         * any ephemeral nodes here.
    
         *
    
         * @throws Exception
    
         *             On any error.
    
         */
    
        @throws(classOf[Exception])
    
        def handleNewSession() {
    
          info("re-registering broker info in ZK for broker " + brokerId)
    
          register()
    
          info("done re-registering broker")
    
          info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
    
    }

    但是这个listener是在controller之后注册的,在ZkClient代码中是通过list存放这些listener,并且回调的时候是逐个串行回调,所以如果一个broker上有controller,则必须在controller的handleNewSession之后才会调用KafkaHealthcheck的handleNewSession,所以当controller的handleNewSession卡住之后就不会到/brokers/ids下注册broker节点,看起来就是broker节点从zk上消失,卡住是因为controller在等DeleteTopicsThread做shutdown,DeleteTopicsThread的工作流程是:

      override def run(): Unit = {
    
        info("Starting ")
    
        try{
    
          while(isRunning.get()){
    
            doWork()
    
          }
    
        } catch{
    
          case e: Throwable =>
    
            if(isRunning.get())
    
              error("Error due to ", e)
    
        }
    
        shutdownLatch.countDown()
    
        info("Stopped ")
    
      }

    第一个堆栈就是卡在doWork(如下)上:

     class DeleteTopicsThread() extends ShutdownableThread("delete-topics-thread") {
    
        val zkClient = controllerContext.zkClient
    
        override def doWork() {
    
          inLock(controllerContext.controllerLock) {
    
            awaitTopicDeletionNotification()
    
            val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted

    其中awaitTopicDeletionNotification(如下):

     /**
    
       * Invoked by the delete-topic-thread to wait until events that either trigger, restart or halt topic deletion occur.
    
       * controllerLock should be acquired before invoking this API
    
       */
    
      private def awaitTopicDeletionNotification() {
    
        while(!deleteTopicStateChanged) {
    
          info("Waiting for signal to start or continue topic deletion")
    
          deleteTopicsCond.await()
    
        }
    
        deleteTopicStateChanged = false
    
      }

    这个condition只有在resumeTopicDeletionThread方法中被唤醒:

     /**
    
       * Signals the delete-topic-thread to process topic deletion
    
       */
    
      private def resumeTopicDeletionThread() {
    
        deleteTopicStateChanged = true
    
        deleteTopicsCond.signal()
    
      }

    这个resumeTopicDeletionThread方法会在4种情形中调用,否则doWork会一直卡住;

    简单来说,controller在zookeeper连接断开重连之后,会尝试onControllerResignation(清理之前的controller状态)并重新elect,onControllerResignation会等待DeleteTopicsThread退出,而DeleteTopicsThread卡在doWork上导致controller流程被卡住,只有几种情形下才会正常执行完doWork(这个有随机性);这个问题在新版的kafka中被修复(0.9及以上),其中shutdown方法被修改为:

      /**
    
       * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared.
    
       */
    
      def shutdown() {
    
        // Only allow one shutdown to go through
    
        if (isDeleteTopicEnabled && deleteTopicsThread.initiateShutdown()) {
    
          // Resume the topic deletion so it doesn't block on the condition
    
          resumeTopicDeletionThread()
    
          // Await delete topic thread to exit
    
          deleteTopicsThread.awaitShutdown()
    
          topicsToBeDeleted.clear()
    
          partitionsToBeDeleted.clear()
    
          topicsIneligibleForDeletion.clear()
    
        }
    
      }

    其中initiateShutdown和awaitShutdown方法如下:

      def initiateShutdown(): Boolean = {
    
        if(isRunning.compareAndSet(true, false)) {
    
          info("Shutting down")
    
          isRunning.set(false)
    
          if (isInterruptible)
    
            interrupt()
    
          true
    
        } else
    
          false
    
      }
    
     
    
        /**
    
       * After calling initiateShutdown(), use this API to wait until the shutdown is complete
    
       */
    
      def awaitShutdown(): Unit = {
    
        shutdownLatch.await()
    
        info("Shutdown completed")
    
      }

    可见是将0.8版本中的shutdown方法拆成initiateShutdown和awaitShutdown方法,并在中间调用resumeTopicDeletionThread方法避免在doWork上卡住,升级到至少0.9版本就可以解决,官方升级流程如下:

    Upgrading from 0.8.0, 0.8.1.X or 0.8.2.X to 0.9.0.0

    0.9.0.0 has potential breaking changes (please review before upgrading) and an inter-broker protocol change from previous versions. This means that upgraded brokers and clients may not be compatible with older versions. It is important that you upgrade your Kafka cluster before upgrading your clients. If you are using MirrorMaker downstream clusters should be upgraded first as well.

    For a rolling upgrade:

    1. Update server.properties file on all brokers and add the following property: inter.broker.protocol.version=0.8.2.X
    2. Upgrade the brokers. This can be done a broker at a time by simply bringing it down, updating the code, and restarting it.
    3. Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.9.0.0.
    4. Restart the brokers one by one for the new protocol version to take effect

    Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.

    Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.

    详见:http://kafka.apache.org/documentation/#upgrade_9

  • 相关阅读:
    MySQL监控脚本
    django学习笔记【004】创建带有model的app
    MySQL-group-replication 配置
    MySQL配置文件模板
    Python 正则表达式
    MYSQL的安全模式:sql_safe_updates介绍
    Git branch && Git checkout常见用法
    git rebase和git merge的用法
    Git最牛最全详解
    理解npm run
  • 原文地址:https://www.cnblogs.com/barneywill/p/9922975.html
Copyright © 2011-2022 走看看