zoukankan      html  css  js  c++  java
  • spring-kafka之KafkaListener注解深入解读

    简介

    Kafka目前主要作为一个分布式的发布订阅式的消息系统使用,也是目前最流行的消息队列系统之一。因此,也越来越多的框架对kafka做了集成,比如本文将要说到的spring-kafka。

    Kafka既然作为一个消息发布订阅系统,就包括消息生成者和消息消费者。本文主要讲述的spring-kafka框架的kafkaListener注解的深入解读和使用案例。

    解读

    源码解读

    @Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
    
    @Retention(RetentionPolicy.RUNTIME)
    
    @MessageMapping
    
    @Documented
    
    @Repeatable(KafkaListeners.class)
    
    public @interface KafkaListener {
    
    
    
       /**
    
        * 消费者的id,当GroupId没有被配置的时候,默认id为GroupId
    
        */
    
       String id() default "";
    
    
    
       /**
    
        * 监听容器工厂,当监听时需要区分单数据还是多数据消费需要配置containerFactory      属性
    
        */
    
       String containerFactory() default "";
    
    
    
       /**
    
        * 需要监听的Topic,可监听多个,和 topicPattern 属性互斥
    */
    
       String[] topics() default {};
    
    
    
    
       /**
    
        * 需要监听的Topic的正则表达。和 topics,topicPartitions属性互斥
        */
    
       String topicPattern() default "";
    
    
       /**
    
        * 可配置更加详细的监听信息,必须监听某个Topic中的指定分区,或者从offset为200的偏移量开始监听,可配置该参数, 和 topicPattern 属性互斥
        */
    
       TopicPartition[] topicPartitions() default {};
    
    
    
       /**
    
        *侦听器容器组 
    
        */
    
       String containerGroup() default "";
    
    
    
       /**
    
        * 监听异常处理器,配置BeanName
    
        */
    
       String errorHandler() default "";
    
    
    
       /**
    
        * 消费组ID 
    
        */
    
       String groupId() default "";
    
    
    
       /**
    
        * id是否为GroupId
    
        */
    
       boolean idIsGroup() default true;
    
    
    
       /**
    
        * 消费者Id前缀
    
        */
    
       String clientIdPrefix() default "";
    
    
    
       /**
    
        * 真实监听容器的BeanName,需要在 BeanName前加 "__"
    
        */
    
       String beanRef() default "__listener";
    
    
    
    }
    View Code

    使用案例

    ConsumerRecord类消费

    使用ConsumerRecord类接收有一定的好处,ConsumerRecord类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用ConsumerRecord会是个不错的选择。如果使用具体的类型接收消息体则更加方便,比如说用String类型去接收消息体。

    这里我们编写一个Listener方法,监听"topic1"Topic,并把ConsumerRecord里面所包含的内容打印到控制台中:

    @Component
    
    public class Listener {
    
    
    
        private static final Logger log = LoggerFactory.getLogger(Listener.class);
    
    
    
        @KafkaListener(id = "consumer", topics = "topic1")
    
        public void consumerListener(ConsumerRecord<Integer, String> record) {
    
            log.info("topic.quick.consumer receive : " + record.toString());
    
        }
    
    
    
    }
    View Code

    批量消费

    批量消费在现实业务场景中是很有实用性的。因为批量消费可以增大kafka消费吞吐量,提高性能。

    批量消费实现步骤:

    1、重新创建一份新的消费者配置,配置为一次拉取10条消息

    2、创建一个监听容器工厂,命名为:batchContainerFactory,设置其为批量消费并设置并发量为5,这个并发量根据分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态。

    3、创建一个分区数为8的Topic。

    4、创建监听方法,设置消费id为“batchConsumer”,clientID前缀为“batch”,监听“batch”,使用“batchContainerFactory”工厂创建该监听容器。

    @Component
    
    public class BatchListener {
    
    
    
        private static final Logger log= LoggerFactory.getLogger(BatchListener.class);
    
    
    
        private Map<String, Object> consumerProps() {
    
            Map<String, Object> props = new HashMap<>();
    
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    
            //一次拉取消息数量
    
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
    
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    
                    NumberDeserializers.IntegerDeserializer.class);
    
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    
                    StringDeserializer.class);
    
            return props;
    
        }
    
    
    
        @Bean("batchContainerFactory")
    
        public ConcurrentKafkaListenerContainerFactory listenerContainer() {
    
            ConcurrentKafkaListenerContainerFactory container
    
                    = new ConcurrentKafkaListenerContainerFactory();
    
            container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
    
            //设置并发量,小于或等于Topic的分区数
    
            container.setConcurrency(5);
    
            //必须 设置为批量监听
    
            container.setBatchListener(true);
    
            return container;
    
        }
    
    
    
        @Bean
    
        public NewTopic batchTopic() {
    
            return new NewTopic("topic.batch", 8, (short) 1);
    
        }
    
    
    
        @KafkaListener(id = "batchConsumer",clientIdPrefix = "batch"
    
                ,topics = {"topic.batch"},containerFactory = "batchContainerFactory")
    
        public void batchListener(List<String> data) {
    
            log.info("topic.batch  receive : ");
    
            for (String s : data) {
    
                log.info(  s);
    
            }
    
        }
    
    }
    View Code

    监听Topic中指定的分区

    使用@KafkaListener注解的topicPartitions属性监听不同的partition分区。

    @TopicPartition:topic--需要监听的Topic的名称,partitions --需要监听Topic的分区id。

    partitionOffsets --可以设置从某个偏移量开始监听,@PartitionOffset:partition --分区Id,非数组,initialOffset --初始偏移量。

    @Bean
    
    public NewTopic batchWithPartitionTopic() {
    
        return new NewTopic("topic.batch.partition", 8, (short) 1);
    
    }
    
    
    
    @KafkaListener(id = "batchWithPartition",clientIdPrefix = "bwp",containerFactory = "batchContainerFactory",
    
            topicPartitions = {
    
                    @TopicPartition(topic = "topic.batch.partition",partitions = {"1","3"}),
    
                    @TopicPartition(topic = "topic.batch.partition",partitions = {"0","4"},
    
                            partitionOffsets = @PartitionOffset(partition = "2",initialOffset = "100"))
    
            }
    
    )
    
    public void batchListenerWithPartition(List<String> data) {
    
        log.info("topic.batch.partition  receive : ");
    
        for (String s : data) {
    
            log.info(s);
    
        }
    
    }
    View Code

    注解方式获取消息头及消息体

    当你接收的消息包含请求头,以及你监听方法需要获取该消息非常多的字段时可以通过这种方式。。这里使用的是默认的监听容器工厂创建的,如果你想使用批量消费,把对应的类型改为List即可,比如List<String> data , List<Integer> key。

    @Payload:获取的是消息的消息体,也就是发送内容

    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):获取发送消息的key

    @Header(KafkaHeaders.RECEIVED_PARTITION_ID):获取当前消息是从哪个分区中监听到的

    @Header(KafkaHeaders.RECEIVED_TOPIC):获取监听的TopicName

    @Header(KafkaHeaders.RECEIVED_TIMESTAMP):获取时间戳

    @KafkaListener(id = "params", topics = "topic.params")
    
    public void otherListener(@Payload String data,
    
                             @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
    
                             @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
    
                             @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
    
                             @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
    
        log.info("topic.params receive : 
    "+
    
                "data : "+data+"
    "+
    
                "key : "+key+"
    "+
    
                "partitionId : "+partition+"
    "+
    
                "topic : "+topic+"
    "+
    
                "timestamp : "+ts+"
    "
    
        );
    
    }
    View Code

    使用Ack机制确认消费

    Kafka是通过最新保存偏移量进行消息消费的,而且确认消费的消息并不会立刻删除,所以我们可以重复的消费未被删除的数据,当第一条消息未被确认,而第二条消息被确认的时候,Kafka会保存第二条消息的偏移量,也就是说第一条消息再也不会被监听器所获取,除非是根据第一条消息的偏移量手动获取。Kafka的ack 机制可以有效的确保消费不被丢失。因为自动提交是在kafka拉取到数据之后就直接提交,这样很容易丢失数据,尤其是在需要事物控制的时候。

    使用Kafka的Ack机制比较简单,只需简单的三步即可:

    1. 设置ENABLE_AUTO_COMMIT_CONFIG=false,禁止自动提交
    2. 设置AckMode=MANUAL_IMMEDIATE
    3. 监听方法加入Acknowledgment ack 参数

    4.使用Consumer.seek方法,可以指定到某个偏移量的位置

    @Component
    
    public class AckListener {
    
        private static final Logger log = LoggerFactory.getLogger(AckListener.class);
    
    
    
        private Map<String, Object> consumerProps() {
    
            Map<String, Object> props = new HashMap<>();
    
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
            return props;
    
        }
    
    
    
        @Bean("ackContainerFactory")
    
        public ConcurrentKafkaListenerContainerFactory ackContainerFactory() {
    
            ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
    
            factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
    
            return factory;
    
        }
    
    
    
    
    
        @KafkaListener(id = "ack", topics = "topic.ack", containerFactory = "ackContainerFactory")
    
        public void ackListener(ConsumerRecord record, Acknowledgment ack) {
    
            log.info("topic.quick.ack receive : " + record.value());
    
            ack.acknowledge();
    
        }
    
    }
    View Code

    解决重复消费

    上一节中使用ack手动提交偏移量时,假如consumer挂了重启,那它将从committed offset位置开始重新消费,而不是consume offset位置。这也就意味着有可能重复消费。

    在0.9客户端中,有3种ack策略: 

    策略1: 自动的,周期性的ack。

    策略2:consumer.commitSync(),调用commitSync,手动同步ack。每处理完1条消息,commitSync 1次。

    策略3:consumer. commitASync(),手动异步ack。、

    那么使用策略2,提交每处理完1条消息,就发送一次commitSync。那这样是不是就可以解决“重复消费”了呢?如下代码:

    while (true) {
    
            List<ConsumerRecord> buffer = new ArrayList<>();
    
            ConsumerRecords<String, String> records = consumer.poll(100);
    
            for (ConsumerRecord<String, String> record : records) {
    
                buffer.add(record);
    
            }
    
            insertIntoDb(buffer);    //消除处理,存到db
    
            consumer.commitSync();   //同步发送ack
    
            buffer.clear();
    
        }
    
    }
    View Code

    答案是否定的!因为上面的insertIntoDb和commitSync做不到原子操作:如果在数据处理完成,commitSync的时候挂了,服务器再次重启,消息仍然会重复消费。

             那么如何解决重复消费的问题呢?答案是自己保存committed offset,而不是依赖kafka的集群保存committed offset,把消息的处理和保存offset做成一个原子操作,并且对消息加入唯一id,进行判重。

    依照官方文档,要自己保存偏移量,需要:

    1. enable.auto.commit=false, 禁用自动ack。
    2. 每次取到消息,把对应的offset存下来。
    3. 下次重启,通过consumer.seek函数,定位到自己保存的offset,从那开始消费。
    4. 更进一步处理可以对消息加入唯一id,进行判重。
  • 相关阅读:
    solrcloud
    nginx代理服务器3--高可用(keepalived)
    Nginx反向代理1--基本介绍-虚拟主机
    Nginx反向代理2--配置文件配置
    C/S与B/S区别
    数据类型转换(客户端与服务器端)
    SYN Cookie的原理和实现
    ubuntu 18.04 配置 rc.local
    Summary Checklist for Run-Time Kubernetes Security
    形意拳内功心法
  • 原文地址:https://www.cnblogs.com/HendSame-JMZ/p/12987559.html
Copyright © 2011-2022 走看看