zoukankan      html  css  js  c++  java
  • kafka的reblance机制

    一、什么是kafka的Rebalance

      kafka集群模式下,一个topic有多个partition,对于消费端,可以有多个consumer同时消费这些partition。为了保证大体上partition和consumer的均衡性,提升topic的并发消费能力,所以会有Rebalance。Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 consumer 如何达成一致,来分配订阅 Topic 的每个分区。

    二、什么时机触发Rebalance

    0.10kafka的rebalance条件

    • 条件1:有新的consumer加入
    • 条件2:旧的consumer挂了
    • 条件3:coordinator挂了,集群选举出新的coordinator(0.10 特有的)
    • 条件4:topic的partition新加
    • 条件5:consumer调用unsubscrible(),取消topic的订阅

    当一个group中,有consumer加入或者离开时,会触发partitions均衡。Kafka的Consumer Rebalance方案是基于Zookeeper的Watcher来实现的。consumer启动的时候,在zk下都维护一个”/consumers/[group_name]/ids”路径,在此路径下,使用临时节点记录属于此cg的消费者的Id,该Id信息由对应的consumer在启动时创建。每个consumer都会在此路径下简历一个watcher,当有节点发生变化时,就会触发watcher,然后触发Rebalance过程。

    三、0.9之前kafka的Rebalance算法

    Consumer rebalacne算法:

    1. 将目标 topic 下的所有 partirtion 排序,存于PT
    2. 对某 consumer group 下所有 consumer 排序,存于 CG,第 i 个consumer 记为 Ci
    3. N=size(PT)/size(CG),向上取整
    4. 解除 Ci 对原来分配的 partition 的消费权(i从0开始)
    5. 将第i*N到(i+1)*N-1个 partition 分配给 Ci  

    在Rebalance期间,consumer不能正常消费,并且这种Rebalance过程强依赖zk,存在以下问题:

    • herd effect(羊群效应):一个被Watch的zk节点变化,导致大量的watcher通知需要被发送给客户端,这会导致在通知期间其他操作的延迟。
    • split brain:每个Consumer都是通过zk中保存的元数据来判断group中各其他成员的状态,以及broker的状态,进而分别进入各自的Rebalance,执行各自的Rebalance逻辑。不同的Consumer在同一时刻可能连接在不同的zk服务器上,看到的元数据就可能不一样,基于不一样的元数据,执行Rebalance就会产生不一致(冲突)的Rebalance结果,Rebalance的冲突,会到导致consumer的rebalance失败。
    • 重复消费问题:因为Rebalance时,很有可能导致offset commit不成功,所以可能造成重复消费问题。

    解决办法:

    • 加大Rebalance的重试时间:"rebalance.backoff.ms=5000"
    • 加大Rebalance失败的retry次数: "rebalance.max.retries=10"
    • 捕获"ConsumerRebalanceFailedException",退出程序。

    四、0.9后kafka对Rebalance过程进行了改进

    Group Coordinator是一个服务,每个Broker在启动的时候都会启动一个该服务。Group Coordinator的作用是用来存储Group的相关Meta信息,并将对应Partition的Offset信息记录到Kafka内置Topic(__consumer_offsets)中。Kafka在0.9之前是基于Zookeeper来存储Partition的Offset信息(consumers/{group}/offsets/{topic}/{partition}),因为ZK并不适用于频繁的写操作,所以在0.9之后通过内置Topic的方式来记录对应Partition的Offset。

    每个Group都会选择一个Coordinator来完成自己组内各Partition的Offset信息。那么consumer group如何确定自己的coordinator是谁呢? 简单来说分为两步:

    • 确定consumer group位移信息写入__consumers_offsets的哪个分区。具体计算公式:
      •   __consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)   注意:groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。
    • 该分区leader所在的broker就是被选定的coordinator。

    前面说过, rebalance本质上是一组协议。group与coordinator共同使用它来完成group的rebalance。目前kafka提供了5个协议来处理与consumer group coordination相关的问题:

    • Heartbeat请求:consumer需要定期给coordinator发送心跳来表明自己还活着
    • LeaveGroup请求:主动告诉coordinator我要离开consumer group
    • SyncGroup请求:group leader把分配方案告诉组内所有成员
    • JoinGroup请求:成员请求加入组
    • DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等

    rebalance过程分为2步:Join和Sync

    1 Join, 顾名思义就是加入组。这一步中,所有成员都向coordinator发送JoinGroup请求,请求入组。一旦所有成员都发送了JoinGroup请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader——注意leader和coordinator不是一个概念。leader负责消费分配方案的制定。

    2 Sync,这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。

    五、如何避免不必要的Rebalance

    除开consumer正常的添加和停掉导致rebalance外,在某些情况下,Consumer 实例会被 Coordinator 错误地认为 “已停止” 从而被“踢出”Group,导致rebalance,这种情况应该避免。

    第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被 “踢出”Group 而引发的。这种情况下我们可以设置 session.timeout.ms 和 heartbeat.interval.ms 的值,来尽量避免rebalance的出现。(以下的配置是在网上找到的最佳实践,暂时还没测试过)

    • 设置 session.timeout.ms = 6s。
    • 设置 heartbeat.interval.ms = 2s。
    • 要保证 Consumer 实例在被判定为 “dead” 之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。
    • 这两个参数的区别 https://stackoverflow.com/questions/43881877/difference-between-heartbeat-interval-ms-and-session-timeout-ms-in-kafka-consume

    将 session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer,早日把它们踢出 Group。

    第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。此时,max.poll.interval.ms 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。

  • 相关阅读:
    【uva 1442】Cav(算法效率)
    【uva 10600】ACM Contest and Blackout(图论--次小生成树 模版题)
    【bzoj2429】[HAOI2006]聪明的猴子(图论--最小瓶颈生成树 模版题)
    【uva 534】Frogger(图论--最小瓶颈路 模版题)
    【poj 1988】Cube Stacking(图论--带权并查集)
    【uva 12174】Shuffle(算法效率--滑动窗口)
    关于最小生成树 Kruskal 和 Prim 的简述(图论)
    2019牛客暑期多校训练营(第五场) maximum clique 1
    左偏树/可并堆 学习笔记
    树的计数 Prüfer编码与Cayley公式 学习笔记
  • 原文地址:https://www.cnblogs.com/guoyu1/p/13921033.html
Copyright © 2011-2022 走看看