zoukankan      html  css  js  c++  java
  • kafka的reblance机制

    一、什么是kafka的Rebalance

      kafka集群模式下,一个topic有多个partition,对于消费端,可以有多个consumer同时消费这些partition。为了保证大体上partition和consumer的均衡性,提升topic的并发消费能力,所以会有Rebalance。Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 consumer 如何达成一致,来分配订阅 Topic 的每个分区。

    二、什么时机触发Rebalance

    0.10kafka的rebalance条件

    • 条件1:有新的consumer加入
    • 条件2:旧的consumer挂了
    • 条件3:coordinator挂了,集群选举出新的coordinator(0.10 特有的)
    • 条件4:topic的partition新加
    • 条件5:consumer调用unsubscrible(),取消topic的订阅

    当一个group中,有consumer加入或者离开时,会触发partitions均衡。Kafka的Consumer Rebalance方案是基于Zookeeper的Watcher来实现的。consumer启动的时候,在zk下都维护一个”/consumers/[group_name]/ids”路径,在此路径下,使用临时节点记录属于此cg的消费者的Id,该Id信息由对应的consumer在启动时创建。每个consumer都会在此路径下简历一个watcher,当有节点发生变化时,就会触发watcher,然后触发Rebalance过程。

    三、0.9之前kafka的Rebalance算法

    Consumer rebalacne算法:

    1. 将目标 topic 下的所有 partirtion 排序,存于PT
    2. 对某 consumer group 下所有 consumer 排序,存于 CG,第 i 个consumer 记为 Ci
    3. N=size(PT)/size(CG),向上取整
    4. 解除 Ci 对原来分配的 partition 的消费权(i从0开始)
    5. 将第i*N到(i+1)*N-1个 partition 分配给 Ci  

    在Rebalance期间,consumer不能正常消费,并且这种Rebalance过程强依赖zk,存在以下问题:

    • herd effect(羊群效应):一个被Watch的zk节点变化,导致大量的watcher通知需要被发送给客户端,这会导致在通知期间其他操作的延迟。
    • split brain:每个Consumer都是通过zk中保存的元数据来判断group中各其他成员的状态,以及broker的状态,进而分别进入各自的Rebalance,执行各自的Rebalance逻辑。不同的Consumer在同一时刻可能连接在不同的zk服务器上,看到的元数据就可能不一样,基于不一样的元数据,执行Rebalance就会产生不一致(冲突)的Rebalance结果,Rebalance的冲突,会到导致consumer的rebalance失败。
    • 重复消费问题:因为Rebalance时,很有可能导致offset commit不成功,所以可能造成重复消费问题。

    解决办法:

    • 加大Rebalance的重试时间:"rebalance.backoff.ms=5000"
    • 加大Rebalance失败的retry次数: "rebalance.max.retries=10"
    • 捕获"ConsumerRebalanceFailedException",退出程序。

    四、0.9后kafka对Rebalance过程进行了改进

    Group Coordinator是一个服务,每个Broker在启动的时候都会启动一个该服务。Group Coordinator的作用是用来存储Group的相关Meta信息,并将对应Partition的Offset信息记录到Kafka内置Topic(__consumer_offsets)中。Kafka在0.9之前是基于Zookeeper来存储Partition的Offset信息(consumers/{group}/offsets/{topic}/{partition}),因为ZK并不适用于频繁的写操作,所以在0.9之后通过内置Topic的方式来记录对应Partition的Offset。

    每个Group都会选择一个Coordinator来完成自己组内各Partition的Offset信息。那么consumer group如何确定自己的coordinator是谁呢? 简单来说分为两步:

    • 确定consumer group位移信息写入__consumers_offsets的哪个分区。具体计算公式:
      •   __consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)   注意:groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。
    • 该分区leader所在的broker就是被选定的coordinator。

    前面说过, rebalance本质上是一组协议。group与coordinator共同使用它来完成group的rebalance。目前kafka提供了5个协议来处理与consumer group coordination相关的问题:

    • Heartbeat请求:consumer需要定期给coordinator发送心跳来表明自己还活着
    • LeaveGroup请求:主动告诉coordinator我要离开consumer group
    • SyncGroup请求:group leader把分配方案告诉组内所有成员
    • JoinGroup请求:成员请求加入组
    • DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等

    rebalance过程分为2步:Join和Sync

    1 Join, 顾名思义就是加入组。这一步中,所有成员都向coordinator发送JoinGroup请求,请求入组。一旦所有成员都发送了JoinGroup请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader——注意leader和coordinator不是一个概念。leader负责消费分配方案的制定。

    2 Sync,这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。

    五、如何避免不必要的Rebalance

    除开consumer正常的添加和停掉导致rebalance外,在某些情况下,Consumer 实例会被 Coordinator 错误地认为 “已停止” 从而被“踢出”Group,导致rebalance,这种情况应该避免。

    第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被 “踢出”Group 而引发的。这种情况下我们可以设置 session.timeout.ms 和 heartbeat.interval.ms 的值,来尽量避免rebalance的出现。(以下的配置是在网上找到的最佳实践,暂时还没测试过)

    • 设置 session.timeout.ms = 6s。
    • 设置 heartbeat.interval.ms = 2s。
    • 要保证 Consumer 实例在被判定为 “dead” 之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。
    • 这两个参数的区别 https://stackoverflow.com/questions/43881877/difference-between-heartbeat-interval-ms-and-session-timeout-ms-in-kafka-consume

    将 session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer,早日把它们踢出 Group。

    第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。此时,max.poll.interval.ms 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。

  • 相关阅读:
    svn cleanup failed–previous operation has not finished 解决方法
    开源SNS社区系统推荐
    从网络获取图片本地保存
    MS SQL Server 数据库连接字符串
    KeepAlive
    Configure Git in debian
    sqlserver query time
    RPi Text to Speech (Speech Synthesis)
    SQL Joins with C# LINQ
    search or reseed identity columns in sqlserver 2008
  • 原文地址:https://www.cnblogs.com/guoyu1/p/13921033.html
Copyright © 2011-2022 走看看