- 所谓 CommitFailedException,顾名思义就是 Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常
- 很多提交位移的 API 方法是支持自动错误重试的,比如我们在上一期中提到的commitSync 方法
- 异常解释
- 本次提交位移失败了,原因是消费者组已经开启了 Rebalance 过程,并且将要提交位移的分区分配给了另一个消费者实例
- 你的消费者实例花费了太长的时间进行消息处理,耽误了调用 poll 方法
- 在后半部分,社区给出了两个相应的解决办法(即橙色字部分):
- 增加期望的时间间隔 max.poll.interval.ms 参数值。
- 减少 poll 方法一次性返回的消息数量,即减少 max.poll.records 参数值
- CommitFailedException 异常通常发生在手动提交位移时,即用户显式调用 KafkaConsumer.commitSync() 方法时
- 场景一
- 当消息处理的总时间超过预设的 max.poll.interval.ms 参数值时,Kafka Consumer 端会抛出 CommitFailedException 异常
- 你只需要写一个 Consumer 程序,使用 KafkaConsumer.subscribe 方法随意订阅一个主题
- 之后设置 Consumer 端参数 max.poll.interval.ms=5 秒
- 最后在循环调用 KafkaConsumer.poll 方法之间,插入 Thread.sleep(6000) 和手动提交位移,就可以成功复现这个异常了
Properties props = new Properties();…props.put("max.poll.interval.ms", 5000);consumer.subscribe(Arrays.asList("test-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); // 使用 Thread.sleep 模拟真实的消息处理逻辑 Thread.sleep(6000L); consumer.commitSync();}
- 如果要防止这种场景下抛出异常,你需要简化你的消息处理逻辑。具体来说有 4 种方法
- 1、缩短单条消息处理的时间。比如,之前下游系统消费一条消息的时间是 100 毫秒,优化之后成功地下降到 50 毫秒,那么此时 Consumer 端的 TPS 就提升了一倍。
- 2、增加 Consumer 端允许下游系统消费一批消息的最大时长。这取决于 Consumer 端参数 max.poll.interval.ms 的值。在最新版的 Kafka 中,该参数的默认值是 5 分钟。如果你的消费逻辑不能简化,那么提高该参数值是一个不错的办法。值得一提的是,Kafka 0.10.1.0 之前的版本是没有这个参数的,因此如果你依然在使用 0.10.1.0 之前的客户端 API,那么你需要增加 session.timeout.ms 参数的值。不幸的是,session.timeout.ms 参数还有其他的含义,因此增加该参数的值可能会有其他方面的“不良影响”,这也是社区在 0.10.1.0 版本引入 max.poll.interval.ms 参数,将这部分含义从 session.timeout.ms 中剥离出来的原因之一。
- 3、减少下游系统一次性消费的消息总数。这取决于 Consumer 端参数 max.poll.records 的值。当前该参数的默认值是 500 条,表明调用一次 KafkaConsumer.poll 方法,最多返回 500 条消息。可以说,该参数规定了单次 poll 方法能够返回的消息总数的上限。如果前两种方法对你都不适用的话,降低此参数值是避免 CommitFailedException 异常最简单的手段。
- 4、下游系统使用多线程来加速消费。这应该算是“最高级”同时也是最难实现的解决办法了。具体的思路就是,让下游系统手动创建多个消费线程处理 poll 方法返回的一批消息。之前你使用 Kafka Consumer 消费数据更多是单线程的,所以当消费速度无法匹及 Kafka Consumer 消息返回的速度时,它就会抛出 CommitFailedException 异常。如果是多线程,你就可以灵活地控制线程数量,随时调整消费承载能力,再配以目前多核的硬件条件,该方法可谓是防止 CommitFailedException 最高档的解决之道。事实上,很多主流的大数据流处理框架使用的都是这个方法,比如 Apache Flink 在集成 Kafka 时,就是创建了多个 KafkaConsumerThread 线程,自行处理多线程间的数据消费。不过,凡事有利就有弊,这个方法实现起来并不容易,特别是在多个线程间如何处理位移提交这个问题上,更是极容易出错
- 综合以上这 4 个处理方法,我个人推荐你首先尝试采用方法 1 来预防此异常的发生。优化下游系统的消费逻辑是百利而无一害的法子,不像方法 2、3 那样涉及到 Kafka Consumer 端 TPS 与消费延时(Latency)的权衡。如果方法 1 实现起来有难度,那么你可以按照下面的法则来实践方法 2、3
- 首先,你需要弄清楚你的下游系统消费每条消息的平均延时是多少。比如你的消费逻辑是从 Kafka 获取到消息后写入到下游的 MongoDB 中,假设访问 MongoDB 的平均延时不超过 2 秒,那么你可以认为消息处理需要花费 2 秒的时间。如果按照 max.poll.records 等于 500 来计算,一批消息的总消费时长大约是 1000 秒,因此你的 Consumer 端的 max.poll.interval.ms 参数值就不能低于 1000 秒。如果你使用默认配置,那默认值 5 分钟显然是不够的,你将有很大概率遭遇 CommitFailedException 异常。将 max.poll.interval.ms 增加到 1000 秒以上的做法就属于上面的第 2 种方法。
- 除了调整 max.poll.interval.ms 之外,你还可以选择调整 max.poll.records 值,减少每次 poll 方法返回的消息数。还拿刚才的例子来说,你可以设置 max.poll.records 值为 150,甚至更少,这样每批消息的总消费时长不会超过 300 秒(150*2=300),即 max.poll.interval.ms 的默认值 5 分钟。这种减少 max.poll.records 值的做法就属于上面提到的方法 3。
- 当消息处理的总时间超过预设的 max.poll.interval.ms 参数值时,Kafka Consumer 端会抛出 CommitFailedException 异常
- 场景二
- 了解这个冷门场景,可以帮助你拓宽 Kafka Consumer 的知识面,也能提前预防一些古怪的问题
- 学习 Kafka 的消费者,不过大都集中在消费者组上,即所谓的 Consumer Group。其实,Kafka Java Consumer 端还提供了一个名为 Standalone Consumer 的独立消费者
- 消费者组和独立消费者在使用之前都要指定 group.id
- 现在问题来了,如果你的应用中同时出现了设置相同 group.id 值的消费者组程序和独立消费者程序,那么当独立消费者程序手动提交位移时,Kafka 就会立即抛出 CommitFailedException 异常,因为 Kafka 无法识别这个具有相同 group.id 的消费者实例,于是就向它返回一个错误,表明它不是消费者组内合法的成员
- 虽然说这个场景很冷门,但也并非完全不会遇到。在一个大型公司中,特别是那些将 Kafka 作为全公司级消息引擎系统的公司中,每个部门或团队都可能有自己的消费者应用,谁能保证各自的 Consumer 程序配置的 group.id 没有重复呢?
- 了解这个冷门场景,可以帮助你拓宽 Kafka Consumer 的知识面,也能提前预防一些古怪的问题
- 小结