一、消息消费确认
简单说下kafka消费的逻辑。
当前生产这发送消息到相应的主题topic,消费端可以去监听自己所关注的topic消息,从而实现本地逻辑的流转。
消费的确认的方式:
1、消费端(kafka)自动提交
spring.kafka.consumer.enable-auto-commit=true //这里表示用户无需关注消费的提交,kafka系统会负责帮我们按照一定时间频率提交。
2、消费端手动提交
spring.kafka.consumer.enable-auto-commit=false //这里需要注意,并不是说你这里设置了false就能一定确保消费的offset不会被提交。单纯的设置这个为false是无法保证offset不会被提交。
二、那么如何真正做到消费者offset的没收手动提交就不触发提交呢?
第一:spring.kafka.consumer.enable-auto-commit=false //true的话启动会报错
第二:构建自定义工厂
1 /** 2 * 手动提交 3 * topics订阅的消息主题可以多个{"a","b"} 4 * group-DszConsumer3-1消费组ID 5 * ackContainerFactory自定消费策略 6 */ 7 @KafkaListener(id = "group-DszConsumer3-1", topics = TOPIC, containerFactory = "ackContainerFactory") 8 public void ackListener(ConsumerRecord record, Acknowledgment ack) { 9 String topic = record.topic(); 10 long offset = record.offset(); 11 int partition = record.partition(); 12 log.info("当前消息topic={},offset={},partition={}",topic,offset,partition); 13 //手动提交 14 //ack.acknowledge(); 15 }
自定义消费策略 ackContainerFactory
1 @Configuration 2 public class KafkaConfig { 3 @Bean("ackContainerFactory") 4 public ConcurrentKafkaListenerContainerFactory ackContainerFactory(ConsumerFactory consumerFactory) { 5 ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); 6 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置手动模式 7 factory.setConsumerFactory(consumerFactory); 8 return factory; 9 } 10 11 }
按照以上方式,如果没有手动触发 ack.acknowledge();那么消费组是不费提交已消费的offset,每次重启消费组都会再次消费。
这就是真正实现把是否需要提交offset权限交给开发者,自由控制。
总结:spring.kafka.consumer.enable-auto-commit=false并不是真正意义关闭自动提交,而是吧提交的权限交给Spring。Spring会帮我们去提交,省去用户提交的过程。这就是为什么false的时候明明没有手动提交去还是提交的疑惑。
而spring.kafka.consumer.enable-auto-commit=true则是自动提交权限由kafka自身在控制。