zoukankan      html  css  js  c++  java
  • kafka 不同的consumer需要使用不同的group id

    打算在一个项目同时使用两个consumer消费两个topic,在配置文件中配置了consumer的默认groupid,未给两个consumer指定各自的groupid,于是两个consumer都使用同一个groupid

    # 指定默认消费者group id
    spring.kafka.consumer.group-id=test-message-group

    但在断点调试过程中发现两个consumer偶尔正常工作,偶尔却在不断的rebanlance,并且伴随着心跳发送失败。特别在频繁读取数据或者断点调试时间比较长的时候频繁出现。而在一个consumer无法成功rebanlance时,无法消费数据。

    异常状态的输出日志如下,一个consumer无法完成rebanlance,另一个conumser则无法发送心跳

    [Consumer clientId=consumer-1, groupId=test-message-group] Attempt to heartbeat failed since group is rebalancing
    [Consumer clientId=consumer-1, groupId=test-message-group] Attempt to heartbeat failed since group is rebalancing
    [Consumer clientId=consumer-1, groupId=test-message-group] Attempt to heartbeat failed since group is rebalancing
    [Consumer clientId=consumer-2, groupId=test-message-group] (Re-)joining group
    [Consumer clientId=consumer-2, groupId=test-message-group] (Re-)joining group
    [Consumer clientId=consumer-1, groupId=test-message-group] Attempt to heartbeat failed since group is rebalancing
    [Consumer clientId=consumer-1, groupId=test-message-group] Attempt to heartbeat failed since group is rebalancing

    两个consumer正常状态下的输出日志如下,均显示为 Successfully joined group with generation XXX

    [Consumer clientId=consumer-1, groupId=test-message-group] (Re-)joining group
    [Consumer clientId=consumer-2, groupId=test-message-group] Successfully joined group with generation 125
    [Consumer clientId=consumer-1, groupId=test-message-group] Successfully joined group with generation 125
    [Consumer clientId=consumer-1, groupId=test-message-group] Setting newly assigned partitions: HolderMsg-0, HolderMsg-1, HolderMsg-2
    [Consumer clientId=consumer-2, groupId=test-message-group] Setting newly assigned partitions: TcMsg-2, TcMsg-0, TcMsg-1
    [Consumer clientId=consumer-1, groupId=test-message-group] Setting offset for partition HolderMsg-0 to the committed offset FetchPosition{offset=7, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=192.168.202.128:9092 (id: 0 rack: null), epoch=0}}
    [Consumer clientId=consumer-2, groupId=test-message-group] Setting offset for partition TcMsg-2 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=192.168.202.128:9092 (id: 0 rack: null), epoch=0}}
    [Consumer clientId=consumer-1, groupId=test-message-group] Setting offset for partition HolderMsg-1 to the committed offset FetchPosition{offset=5, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=192.168.202.128:9092 (id: 0 rack: null), epoch=0}}
    [Consumer clientId=consumer-2, groupId=test-message-group] Setting offset for partition TcMsg-0 to the committed offset FetchPosition{offset=2, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=192.168.202.128:9092 (id: 0 rack: null), epoch=0}}
    [Consumer clientId=consumer-1, groupId=test-message-group] Setting offset for partition HolderMsg-2 to the committed offset FetchPosition{offset=7, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=192.168.202.128:9092 (id: 0 rack: null), epoch=0}}
    [Consumer clientId=consumer-2, groupId=test-message-group] Setting offset for partition TcMsg-1 to the committed offset FetchPosition{offset=3, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=192.168.202.128:9092 (id: 0 rack: null), epoch=0}}

    在反复的调试过程中,我意识到这两个consumer反复重新加入group中,而他们的group是同一个,猜测是两个consumer使用同一个groupid导致,相互影响。在分别为两个consumer指定了单独的groupid之后,异常的情况没有再出现。

    基于这种现象,查询了一些资料,在一篇blog中找到了比较详细的讲解

    https://olnrao.wordpress.com/2015/05/15/apache-kafka-case-of-mysterious-rebalances/

    文章中提到,消费者在zookeeper中注册中,消费者注册标识符(Consumer Identifiers Registry)是保存在zookeeper的/consumers/[group_id]/ids/[consumer_connector_id]的路径下,这些消费者注册节点形成一棵树,当有消费者加入或离开时,树上所有的消费者都会被通知到,从而进行rebanlance。

    消费者在zookeeper注册的路径与topic并没有关系,反而与groupid绑定,这是因为同一个consumer可以消费不同的topic。如果不同的consumer使用同一个groupid消费不同的topic,而任何一个topic的consumer出现加入或离开等变化时,所有groupid组里的consumer都会发生rebanlance。从而可能导致上面调试时出现的问题。

    所以kafka 不同的consumer需要使用不同的group id,以减小相互之间的影响。

  • 相关阅读:
    flash不保存在IE缓存中
    pv 3d
    ASP.NET优化
    根据文件名判断文件扩展名 和 根据流真正判断文件类型的关键函数
    as3.0
    编码
    AS3中Event的target和currentTarget
    显示用户IP来源的代码
    启用IIS的Gzip压缩功能
    Css之多个子元素水平平均分布——flex布局法
  • 原文地址:https://www.cnblogs.com/zhaoshizi/p/12297646.html
Copyright © 2011-2022 走看看