- 对于 Kafka 消费者来说,最重要的事情就是监控它们的消费进度了,或者说是监控它们消费的滞后程度
- 这个滞后程度有个专门的名称:消费者 Lag
- 所谓滞后程度,就是指消费者当前落后于生产者的程度
- Lag 的单位是消息数,而且我们一般是在主题这个级别上讨论 Lag 的
- 但实际上,Kafka 监控 Lag 的层级是在分区上的
- 如果要计算主题级别的,你需要手动汇总所有主题分区的 Lag,将它们累加起来,合并成最终的 Lag 值
- Lag 应该算是最最重要的监控指标
- 一个正常工作的消费者,它的 Lag 值应该很小,甚至是接近于 0 的,这表示该消费者能够及时地消费生产者生产出来的消息,滞后程度很小
- 由于消费者的速度无法匹及生产者的速度,极有可能导致它消费的数据已经不在操作系统的页缓存中了,那么这些数据就会失去享有 Zero Copy 技术的资格
- 消费者就不得不从磁盘上读取它们,这就进一步拉大了与生产者的差距,进而出现马太效应
- 你在实际业务场景中必须时刻关注消费者的消费进度。一旦出现 Lag 逐步增加的趋势,一定要定位问题,及时处理,避免造成业务损失
- 既然消费进度这么重要,我们应该怎么监控它呢?简单来说,有 3 种方法。
- 1、使用 Kafka 自带的命令行工具 kafka-consumer-groups 脚本。
- 2、使用 Kafka Java Consumer API 编程。
- 3、使用 Kafka 自带的 JMX 监控指标
- Kafka 自带命令
- 使用 Kafka 自带的命令行工具 bin/kafka-consumer-groups.sh(bat)
- kafka-consumer-groups 脚本是 Kafka 为我们提供的最直接的监控消费者消费进度的工具
- 使用 kafka-consumer-groups 脚本很简单。该脚本位于 Kafka 安装目录的 bin 子目录下,我们可以通过下面的命令来查看某个给定消费者的 Lag 值
- $ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker 连接信息 > --describe --group <group 名称 >
- Kafka 连接信息就是 < 主机名:端口 > 对,而 group 名称就是你的消费者程序中设置的 group.id值
- 在运行命令时,我指定了 Kafka 集群的连接信息,即 localhost:9092。另外,我还设置了要查询的消费者组名:testgroup。kafka-consumer-groups 脚本的输出信息很丰富。首先,它会按照消费者组订阅主题的分区进行展示,每个分区一行数据;其次,除了主题、分区等信息外,它会汇报每个分区当前最新生产的消息的位移值(即 LOG-END-OFFSET 列值)、该消费者组当前最新消费消息的位移值(即 CURRENT-OFFSET 值)、LAG 值(前两者的差值)、消费者实例 ID、消费者连接 Broker 的主机名以及消费者的 CLIENT-ID 信息
- 有的时候,你运行这个脚本可能会出现下面这种情况
- 简单比较一下,我们很容易发现它和前面那张图输出的区别,即 CONSUMER-ID、HOST 和 CLIENT-ID 列没有值!如果碰到这种情况,你不用惊慌,这是因为我们运行 kafka-consumer-groups 脚本时没有启动消费者程序。请注意我标为橙色的文字,它显式地告诉我们,当前消费者组没有任何 active 成员,即没有启动任何消费者实例。虽然这些列没有值,但 LAG 列依然是有效的,它依然能够正确地计算出此消费者组的 Lag 值。
- 除了上面这三列没有值的情形,还可能出现的一种情况是该命令压根不返回任何结果。此时,你也不用惊慌,这是因为你使用的 Kafka 版本比较老,kafka-consumer-groups 脚本还不支持查询非 active 消费者组。一旦碰到这个问题,你可以选择升级你的 Kafka 版本,也可以采用我接下来说的其他方法来查询
- 使用 Kafka 自带的命令行工具 bin/kafka-consumer-groups.sh(bat)
- Kafka Java Consumer API
- 用程序的方式自动化监控
- 社区提供的 Java Consumer API 分别提供了查询当前分区最新消息位移和消费者组最新消费消息位移两组方法,我们使用它们就能计算出对应的 Lag
1 public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException { 2 Properties props = new Properties(); 3 props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 4 try (AdminClient client = AdminClient.create(props)) { 5 ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID); 6 try { 7 Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); 8 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移 9 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); 10 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 11 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 12 try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { 13 Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet()); 14 return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(), 15 entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset())); 16 } 17 } catch (InterruptedException e) { 18 Thread.currentThread().interrupt(); 19 // 处理中断异常 20 // ... 21 return Collections.emptyMap(); 22 } catch (ExecutionException e) { 23 // 处理 ExecutionException 24 // ... 25 return Collections.emptyMap(); 26 } catch (TimeoutException e) { 27 throw new TimeoutException("Timed out when getting lag for consumer group " + groupID); 28 } 29 } 30 }
- 不过请注意,这段代码只适用于 Kafka 2.0.0 及以上的版本,2.0.0 之前的版本中没有 AdminClient.listConsumerGroupOffsets 方法
- Kafka JMX 监控指标
- 我们借助的往往是现成的监控框架
- 因为它们都不能集成进已有的监控框架中,如 Zabbix 或 Grafana
- 下面我们就来看第三种方法,使用 Kafka 默认提供的 JMX 监控指标来监控消费者的 Lag 值
- Kafka 消费者提供了一个名为 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指标
- 和我们今天所讲内容相关的有两组属性:records-lag-max 和 records-lead-min,它们分别表示此消费者在测试窗口时间内曾经达到的最大的 Lag 值和最小的 Lead 值
- 这里的 Lead 值是指消费者最新消费消息的位移与分区当前第一条消息位移的差值。很显然,Lag 和 Lead 是一体的两个方面:Lag 越大的话,Lead 就越小,反之也是同理
- 在实际生产环境中,请你一定要同时监控 Lag 值和 Lead 值
- 一旦你监测到 Lead 越来越小,甚至是快接近于 0 了,你就一定要小心了,这可能预示着消费者端要丢消息了
- Kafka 的消息是有留存时间设置的,默认是 1 周,也就是说 Kafka 默认删除 1 周前的数据。倘若你的消费者程序足够慢,慢到它要消费的数据快被 Kafka 删除了,这时你就必须立即处理,否则一定会出现消息被删除,从而导致消费者程序重新调整位移值的情形
- JConsole 工具监控此 JMX 指标的截图
- Kafka 消费者还在分区级别提供了额外的 JMX 指标,用于单独监控分区级别的 Lag 和 Lead 值
- JMX 名称为:kafka.consumer:type=consumer-fetch-manager-metrics,partition=“{partition}”,topic=“{topic}”,client-id=“{client-id}”。
- 在我们的例子中,client-id 还是 consumer-1,主题和分区分别是 test 和 0。下图展示出了分区级别的 JMX 指标:
- 分区级别的 JMX 指标中多了 records-lag-avg 和 records-lead-avg 两个属性,可以计算平均的 Lag 值和 Lead 值。在实际场景中,我们会更多地使用这两个 JMX 指标
- 我们借助的往往是现成的监控框架
- 小结
- 我今天完整地介绍了监控消费者组以及独立消费者程序消费进度的 3 种方法
- 从使用便捷性上看,应该说方法 1 是最简单的,我们直接运行 Kafka 自带的命令行工具即可
- 方法 2 使用 Consumer API 组合计算 Lag,也是一种有效的方法,重要的是它能集成进很多企业级的自动化监控工具中
- 不过,集成性最好的还是方法 3,直接将 JMX 监控指标配置到主流的监控框架就可以了。
- 在真实的线上环境中,我建议你优先考虑方法 3,同时将方法 1 和方法 2 作为备选,装进你自己的工具箱中,随时取出来应对各种实际场景
- 我今天完整地介绍了监控消费者组以及独立消费者程序消费进度的 3 种方法