zoukankan      html  css  js  c++  java
  • kafka多线程消费topic的问题

    案例:

      topic:my-topic,分区:6

      消费者:部署三台机器,每台机器上面开启6个线程消费。

      消费结果:只有一台机器可以正常消费,另外两台机器直接输出六条告警日志:

    No broker partitions consumed by consumer thread my-topic-group_adfc6be4a509-1496976531798-d70f9a43-3 for topic my-topic
    No broker partitions consumed by consumer thread my-topic-group_adfc6be4a509-1496976531798-d70f9a43-1 for topic my-topic
    No broker partitions consumed by consumer thread my-topic-group_adfc6be4a509-1496976531798-d70f9a43-2 for topic my-topic
    No broker partitions consumed by consumer thread my-topic-group_adfc6be4a509-1496976531798-d70f9a43-4 for topic my-topic
    No broker partitions consumed by consumer thread my-topic-group_adfc6be4a509-1496976531798-d70f9a43-6 for topic my-topic
    No broker partitions consumed by consumer thread my-topic-group_adfc6be4a509-1496976531798-d70f9a43-5 for topic my-topic
    

    在源码分析之前,先给个图示吧,花了两个小时才画完。

    源码分析:

    for (topic <- ctx.myTopicThreadIds.keySet) {
       // curConsumers = 6*3 = 18,当前消费者数量
          val curConsumers = ctx.consumersForTopic(topic)
       // curPartitions = 6,当前分区数量
          val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)
       // nPartsPerConsumer = 6/18 = 0,平均每个消费者能分到的分区数【取整】
          val nPartsPerConsumer = curPartitions.size / curConsumers.size
       /*
             nConsumersWithExtraPart = 6%18 = 6,如果分割不均匀(消费者和分区数不是倍数关系),那么前N个消费者将会消费一个额外的分区
             这里得出结果是6,那么其含义可以理解为前6个消费者可以比其他消费多消费一个分区,前6个各占有一个分区,后面12个消费者各占有0个分区
           */
          val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
    
          info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +
            " for topic " + topic + " with consumers: " + curConsumers)
    
          for (consumerThreadId <- curConsumers) {
        // myConsumerPosition是指当前consumerThreadId在消费者集合中的位置
            val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
            assert(myConsumerPosition >= 0)
        /*
                 startPart = 0*6 + myConsumerPosition.min(6),min函数表示取两个数值中小的一个,那么startPart的值就分成了两个部分:[0-5] -> 0-5,[6-17] -> 6
                 分区升序排列之后,startPart表示当前消费者从哪个分区开始消费。
             */
            val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
        /*
                 nParts = 0 + (myConsumerPosition + 1 > 6 ) ? 0 : 1 ,这里nParts的值也分成了两部分,[0-5] -> 1 , [6-17] -> 0
                如果消费者数量小于分区数量,则前nConsumersWithExtraPart个消费者的分区数量会是2,nParts只会有三种值【0,1,2】,
                表示当前消费者可以消费分区的数量。
                
            */
            val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
    
            /**
             *   Range-partition the sorted partitions to consumers for better locality.
             *  The first few consumers pick up an extra partition, if any.
             */
        // 这里myConsumerPosition在[6-17]的comsumer都会直接告警,也就是上文提到的【额外部分消费者】
            if (nParts <= 0)
              warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
            else {
          // 这里myConsumerPosition在[0-5]的comsumer进入topic分区分配
              for (i <- startPart until startPart + nParts) {
                val partition = curPartitions(i)
                info(consumerThreadId + " attempting to claim partition " + partition)
                // record the partition ownership decision
                val assignmentForConsumer = partitionAssignment.getAndMaybePut(consumerThreadId.consumer)
                assignmentForConsumer += (TopicAndPartition(topic, partition) -> consumerThreadId)
              }
            }
          }
    }

    结果:topic里面的每个partition只会由一个线程消费,在分配的时候就已经指定好,如果有消费者线程加入或者退出,则会重新开始分配。

  • 相关阅读:
    【题解】[Codeforces 407B] Long Path / doughnut【20201030 CSP 模拟赛】【DP】
    Powerful number 筛略解
    【题解】[Codeforces 1400E] Clear the Multiset
    安卓中Activity的onStart()和onResume()的区别是什么
    Android TextView自动换行文字排版参差不齐的原因
    Android 异步加载解决方案
    Android Camera 相机程序编写
    关于android中EditText边框的问题 下划线
    getDimension,getDimensionPixelOffset和getDimensionPixelSize的一点说明
    android dimens 读取 px&dp问题
  • 原文地址:https://www.cnblogs.com/yucy/p/6973493.html
Copyright © 2011-2022 走看看