zoukankan      html  css  js  c++  java
  • Kafka消费者组再均衡问题

    在Kafka中,当有新消费者加入或者订阅的topic数发生变化时,会触发Rebalance(再均衡:在同一个消费者组当中,分区的所有权从一个消费者转移到另外一个消费者)机制,Rebalance顾名思义就是重新均衡消费者消费。Rebalance的过程如下:

    第一步:所有成员都向coordinator发送请求,请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。

    第二步:leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。

    所以对于Rebalance来说,Coordinator起着至关重要的作用,那么怎么查看消费者对应的Coordinator呢,我们知道某个消费者组对应__consumer_offsets中的哪个Partation是通过hash计算出来的:partation=hash("test_group_1")%50=28,表示test_group_1这个消费者组属于28号partation,通过命令:

    ./kafka-topics.sh --zookeeper 192.168.33.11:2181 --describe --topic __consumer_offsets

    可以找到28号Partation所对应的信息:

    从而可以知道coordinator对应的broker为1

    在Rebalance期间,消费者会出现无法读取消息,造成整个消费者群组一段时间内不可用,假设现在消费者组当中有A,代码逻辑执行10s,如果消费者组在消费的过程中consumer B加入到了该消费者组,并且B的代码逻辑执行20s,那么当A处理完后先进入Rebalance状态等待,只有当B也处理完后,A和B才真正通过Rebalance重新分配,这样显然A在等待的过程中浪费了资源。

    消费者A:

     1 """
     2 consumer_rebalance_a.py a消费者
     3 """
     4 import pickle
     5 import uuid
     6 import time
     7 from kafka import KafkaConsumer
     8 from kafka.structs import TopicPartition, OffsetAndMetadata
     9 from kafka import ConsumerRebalanceListener
    10 
    11 consumer = KafkaConsumer(
    12     bootstrap_servers=['192.168.33.11:9092'],
    13     group_id="test_group_1",
    14     client_id="{}".format(str(uuid.uuid4())),
    15     enable_auto_commit=False,
    16     key_deserializer=lambda k: pickle.loads(k),
    17     value_deserializer=lambda v: pickle.loads(v)
    18 )
    19 
    20 # 用来记录最新的偏移量信息.
    21 consumer_offsets = {}
    22 
    23 
    24 class MineConsumerRebalanceListener(ConsumerRebalanceListener):
    25     def on_partitions_revoked(self, revoked):
    26         """
    27         再均衡开始之前 下一轮poll之前触发
    28         :param revoked:
    29         :return:
    30         """
    31         print('再均衡开始之前被自动触发.')
    32         print(revoked, type(revoked))
    33         consumer.commit_async(offsets=consumer_offsets)
    34 
    35     def on_partitions_assigned(self, assigned):
    36         """
    37         再均衡完成之后  即将下一轮poll之前 触发
    38         :param assigned:
    39         :return:
    40         """
    41         print('在均衡完成之后自动触发.')
    42         print(assigned, type(assigned))
    43 
    44 
    45 consumer.subscribe(topics=('round_topic',), listener=MineConsumerRebalanceListener())
    46 
    47 
    48 def _on_send_response(*args, **kwargs):
    49     """
    50     提交偏移量涉及回调函数
    51     :param args: 
    52     :param kwargs:
    53     :return:
    54     """
    55     if isinstance(args[1], Exception):
    56         print('偏移量提交异常. {}'.format(args[1]))
    57     else:
    58         print('偏移量提交成功')
    59 
    60 
    61 try:
    62     start_time = time.time()
    63     while True:
    64         # 再均衡其实是在poll之前完成的
    65         consumer_records_dict = consumer.poll(timeout_ms=100)
    66 
    67         # 处理逻辑.
    68         for k, record_list in consumer_records_dict.items():
    69             for record in record_list:
    70                 print("topic = {},partition = {},offset = {},key = {},value = {}".format(
    71                     record.topic, record.partition, record.offset, record.key, record.value)
    72                 )
    73 
    74                 consumer_offsets[
    75                     TopicPartition(record.topic, record.partition)
    76                 ] = OffsetAndMetadata(
    77                     record.offset + 1, metadata='偏移量.'
    78                 )
    79 
    80         try:
    81             consumer.commit_async(callback=_on_send_response)
    82             time.sleep(10)
    83         except Exception as e:
    84             print('commit failed', str(e))
    85 
    86 except Exception as e:
    87     print(str(e))
    88 finally:
    89     try:
    90         # 同步提交偏移量,在消费者异常退出的时候再次提交偏移量,确保偏移量的提交.
    91         consumer.commit()
    92         print("同步补救提交成功")
    93     except Exception as e:
    94         consumer.close()

     消费者B:

      1 """
      2 consumer b.py 消费者B
      3 """
      4 
      5 import pickle
      6 import uuid
      7 import time
      8 from kafka import KafkaConsumer
      9 from kafka.structs import TopicPartition, OffsetAndMetadata
     10 from kafka import ConsumerRebalanceListener
     11 
     12 consumer = KafkaConsumer(
     13     bootstrap_servers=['192.168.33.11:9092'],
     14     group_id="test_group_1",
     15     client_id="{}".format(str(uuid.uuid4())),
     16     enable_auto_commit=False,  # 设置为手动提交偏移量.
     17     key_deserializer=lambda k: pickle.loads(k),
     18     value_deserializer=lambda v: pickle.loads(v)
     19 )
     20 
     21 consumer_offsets = {}  # 用来记录最新的偏移量信息.
     22 
     23 
     24 class MineConsumerRebalanceListener(ConsumerRebalanceListener):
     25     def on_partitions_revoked(self, revoked):
     26         """
     27         再均衡开始之前 下一轮poll之前触发
     28         :param revoked:
     29         :return:
     30         """
     31         print('再均衡开始之前被自动触发.')
     32         print(revoked, type(revoked))
     33         consumer.commit_async(offsets=consumer_offsets)
     34 
     35     def on_partitions_assigned(self, assigned):
     36         """
     37         再均衡完成之后  即将下一轮poll之前 触发
     38         :param assigned:
     39         :return:
     40         """
     41 
     42         print('在均衡完成之后自动触发.')
     43         print(assigned, type(assigned))
     44 
     45 
     46 consumer.subscribe(topics=('round_topic',), listener=MineConsumerRebalanceListener())
     47 
     48 
     49 def _on_send_response(*args, **kwargs):
     50     """
     51     提交偏移量涉及回调函数
     52     :param args: 
     53     :param kwargs:
     54     :return:
     55     """
     56 
     57     if isinstance(args[1], Exception):
     58         print('偏移量提交异常. {}'.format(args[1]))
     59     else:
     60         print('偏移量提交成功')
     61 
     62 
     63 try:
     64     start_time = time.time()
     65     while True:
     66         # 再均衡其实是在poll之前完成的
     67         consumer_records_dict = consumer.poll(timeout_ms=100)
     68 
     69         record_num = 0
     70         for key, record_list in consumer_records_dict.items():
     71             for record in record_list:
     72                 record_num += 1
     73         print("---->当前批次获取到的消息个数是:{}".format(record_num))
     74 
     75         # 处理逻辑.
     76         for k, record_list in consumer_records_dict.items():
     77             for record in record_list:
     78                 print("topic = {},partition = {},offset = {},key = {},value = {}".format(
     79                     record.topic, record.partition, record.offset, record.key, record.value)
     80                 )
     81 
     82                 consumer_offsets[
     83                     TopicPartition(record.topic, record.partition)
     84                 ] = OffsetAndMetadata(record.offset + 1, metadata='偏移量.')
     85 
     86         try:
     87             # 轮询一个batch 手动提交一次
     88             consumer.commit_async(callback=_on_send_response)
     89             time.sleep(20)
     90         except Exception as e:
     91             print('commit failed', str(e))
     92 
     93 except Exception as e:
     94     print(str(e))
     95 finally:
     96     try:
     97         # 同步提交偏移量,在消费者异常退出的时候再次提交偏移量,确保偏移量的提交.
     98         consumer.commit()
     99         print("同步补救提交成功")
    100     except Exception as e:
    101         consumer.close()

    消费者A和消费者B是同一个消费者组(test_group_1)的两个消费者,用time.sleep的方式模拟执行时间,A:10s,B:20s;首先A开始消费,当B新加入消费者组的时候会触发Rebalance,可以通过实现再均衡监听器(RebalanceListener)中的on_partitions_revoked和on_partitions_assigned方法来查看再均衡触发前后的partition变化情况,依次启动消费者A和B之后:

    消费者A:
    再均衡开始之前被自动触发.
    {TopicPartition(topic='round_topic', partition=0), TopicPartition(topic='round_topic', partition=1), TopicPartition(topic='round_topic', partition=2)} <class 'set'>
    <----------------------------------------
    ---------------------------------------->
    在均衡完成之后自动触发.
    {TopicPartition(topic='round_topic', partition=0), TopicPartition(topic='round_topic', partition=1)} <class 'set'>
    <----------------------------------------
    
    
    消费者B:
    再均衡开始之前被自动触发.
    set() <class 'set'>
    <----------------------------------------
    ---------------------------------------->
    在均衡完成之后自动触发.
    {TopicPartition(topic='round_topic', partition=2)} <class 'set'>
    <----------------------------------------

    在等待B的逻辑执行完后,A和B进入再均衡状态;再均衡前A处于partition 0、1、 2三个分区,B不占有任何partition;当再均衡结束后,A占有partition 0、1,B占有partition 2;然后A和B分别开始消费对应的partition。

    在上述消费者A和B的代码中重写了RebalanceListener,主要是为了在发生再均衡之前提交最后一个已经处理记录的偏移量,因为再均衡时消费者将失去对一个分区的所有权,如果消费者已经消费了当前partition还没提交offset,这时候发生再均衡会使得消费者重新分配partition,可能使得同一个消息先后被两个消费者消费的情况,实现MineConsumerRebalanceListener再均衡前提交一次offset,确保每一个消费者在触发再均衡前提交最后一次offset:

     1 class MineConsumerRebalanceListener(ConsumerRebalanceListener):
     2     def on_partitions_revoked(self, revoked):
     3         """
     4         再均衡开始之前 下一轮poll之前触发
     5         :param revoked:
     6         :return:
     7         """
     8         print('再均衡开始之前被自动触发.')
     9         print(revoked, type(revoked))
    10         consumer.commit_async(offsets=consumer_offsets)
    11 
    12     def on_partitions_assigned(self, assigned):
    13         """
    14         再均衡完成之后  即将下一轮poll之前 触发
    15         :param assigned:
    16         :return:
    17         """
    18 
    19         print('在均衡完成之后自动触发.')
    20         print(assigned, type(assigned))

    再均衡发生的场景有以下几种:

    1. 组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了)
    2. 订阅主题数发生变更,如果你使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance
    3. 订阅主题的分区数发生变更
    鉴于触发再均衡后会造成资源浪费的问题,所以我们尽量不要触发再均衡
  • 相关阅读:
    python_3 装饰器之初次见面
    python_迭代器
    Python_1生成器(下)之单线并行--生产着消费者模型
    Python_ 1生成器(上)初识生成器
    memcache 和 redis 的区别
    Linux 面试总结
    网络面试总结
    操作系统相关面试总结
    剑指offer 数组中的重复数字
    svn-主副分支使用
  • 原文地址:https://www.cnblogs.com/FG123/p/10095125.html
Copyright © 2011-2022 走看看