zoukankan      html  css  js  c++  java
  • Kafka(二)关于Kafka设置(spring.kafka.consumer.enable-auto-commit=false)依然提交消息确认的offset问题分析。

    一、消息消费确认

    简单说下kafka消费的逻辑。

    当前生产这发送消息到相应的主题topic,消费端可以去监听自己所关注的topic消息,从而实现本地逻辑的流转。

    消费的确认的方式:

    1、消费端(kafka)自动提交

    spring.kafka.consumer.enable-auto-commit=true //这里表示用户无需关注消费的提交,kafka系统会负责帮我们按照一定时间频率提交。

    2、消费端手动提交

    spring.kafka.consumer.enable-auto-commit=false //这里需要注意,并不是说你这里设置了false就能一定确保消费的offset不会被提交。单纯的设置这个为false是无法保证offset不会被提交。

    二、那么如何真正做到消费者offset的没收手动提交就不触发提交呢?

    第一:spring.kafka.consumer.enable-auto-commit=false //true的话启动会报错

    第二:构建自定义工厂

     1     /**
     2      * 手动提交
     3      * topics订阅的消息主题可以多个{"a","b"}
     4      * group-DszConsumer3-1消费组ID
     5      * ackContainerFactory自定消费策略
     6      */
     7     @KafkaListener(id = "group-DszConsumer3-1", topics = TOPIC, containerFactory = "ackContainerFactory")
     8     public void ackListener(ConsumerRecord record, Acknowledgment ack) {
     9         String topic = record.topic();
    10         long offset = record.offset();
    11         int partition = record.partition();
    12         log.info("当前消息topic={},offset={},partition={}",topic,offset,partition);
    13         //手动提交
    14         //ack.acknowledge();
    15     }

    自定义消费策略 ackContainerFactory

     1 @Configuration
     2 public class KafkaConfig {
     3     @Bean("ackContainerFactory")
     4     public ConcurrentKafkaListenerContainerFactory ackContainerFactory(ConsumerFactory consumerFactory) {
     5         ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
     6         factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置手动模式
     7         factory.setConsumerFactory(consumerFactory);
     8         return factory;
     9     }
    10 
    11 }

    按照以上方式,如果没有手动触发 ack.acknowledge();那么消费组是不费提交已消费的offset,每次重启消费组都会再次消费。

    这就是真正实现把是否需要提交offset权限交给开发者,自由控制。

    总结:spring.kafka.consumer.enable-auto-commit=false并不是真正意义关闭自动提交,而是吧提交的权限交给Spring。Spring会帮我们去提交,省去用户提交的过程。这就是为什么false的时候明明没有手动提交去还是提交的疑惑。

    而spring.kafka.consumer.enable-auto-commit=true则是自动提交权限由kafka自身在控制。

  • 相关阅读:
    MySql
    Docker
    达观数据
    Python面试题
    用Python构造ARP请求、扫描、欺骗
    git上传简单的命令行分析
    vue2自定义指令的作用
    自定义指令详解 vue
    文档打印 js
    通过Export2Zip实现表格内容下载成为excel文件
  • 原文地址:https://www.cnblogs.com/dszazhy/p/14855600.html
Copyright © 2011-2022 走看看