前言
本文主要讲述一下spring for kafka的consumer在spring.kafka.consumer.enable-auto-commit是false情况下,AckMode的选项,及手动提交分析总结。
AckMode
RECORD
每处理一条commit一次
BATCH(默认)
每次poll的时候批量提交一次,频率取决于每次poll的调用频率
TIME
每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
COUNT
累积达到ackCount次的ack去commit
COUNT_TIME
ackTime或ackCount哪个条件先满足,就commit
MANUAL
listener负责ack,但是背后也是批量上去
MANUAL_IMMEDIATE
listner负责ack,每调用一次,就立即commit
Manual Commit
- 消费端手动提交offset代码如下:
/** * 这是手动提交的消费方式 * @param record * @param ack * @throws Exception */ @KafkaListener(topics = TopicConstants.COMMON_PAY,groupId = "写自己的消费组 id") public void listenXXXPay(ConsumerRecord<String, String> record , Acknowledgment ack) throws Exception { String msg = JSONObject.parseObject(record.value(), String.class); System.out.println(msg); if (new Random().nextInt(100)<50){ logger.info(String.format("kafka 综合收费消费消息成功---------------- listen1 topic = %s, offset = %d, value = %s ", record.topic(), record.offset(), record.value())); ack.acknowledge(); } }
前提要配置AckMode:
instance.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
- 接下来问题来了, 如果代码中没有进行ack.acknowledge(),会怎么办呢??
消费者在消费消息的过程中,配置参数设置为不自动提交offset,在消费完数据之后如果不手动提交offset,那么在程序中和kafak中的数据会如何被处理呢?
1. 如果在消费kafka的数据过程中,一直没有提交offset,那么在此程序运行的过程中它不会重复消费。但是如果重启之后,就会重复消费之前没有提交offset的数据。
2. 如果在消费的过程中有几条或者一批数据数据没有提交offset,后面其他的消息消费后正常提交offset,那么服务端会更新为消费后最新的offset,不会重新消费,就算重启程序也不会重新消费。
3. 消费者如果没有提交offset,程序不会阻塞或者重复消费,除非在消费到这个你不想提交offset的消息时你尝试重新初始化一个客户端消费者,即可再次消费这个未提交offset的数据。因为客户端也记录了当前消费者的offset信息,所以程序会在每次消费了数据之后,自己记录offset,而手动提交到服务端的offset与这个并没有关系,所以程序会继续往下消费。在你重新初始化客户端消费者之后,会从服务端得到最新的offset信息记录到本地。所以说如果当前的消费的消息没有提交offset,此时在你重新初始化消费者之后,可得到这条未提交消息的offset,从此位置开始消费。