zoukankan      html  css  js  c++  java
  • SpringKakfa——消费者消费消息方式

    SpringKafka消费

    单线程自动提交

    config

    
    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
        //单线程-单条消费  自动提交位移
        @Bean
        public KafkaListenerContainerFactory stringKafkaListenerContainerFactory() {
            Map<String, Object> configProps = new HashMap<>();
            configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "bk1:9092,bk2:9092,bk3:9092");
            configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "topic");
            configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
            return factory;
        }
    }
    

    receiver

    @Component
    public class KafkaReceiver {
    
        private static Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);
    
        //通过自动提交位移
        @KafkaListener(topics = "orderInfo", containerFactory = "stringKafkaListenerContainerFactory")
        public void receive1(ConsumerRecord consumerRecord) {
            SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            logger.info(String.format("time %s recived  message from topic %s  partition %d : %s",sim.format( new Date(consumerRecord.timestamp())),consumerRecord.topic(),consumerRecord.partition(), consumerRecord.value()));
        }
    }
    

    单线程手动提交

    config

        /**
         * 单线程-单条消费  手动提交位移
         */
        @Bean
        public KafkaListenerContainerFactory stringKafkaListenerContainerFactory() {
            Map<String, Object> configProps = new HashMap<>();
            configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "bk1:9092,bk2:9092,bk3:9092");
            configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "topic");
            configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
            factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
            return factory;
        }
    

    receiver

        //通过手动提交位移
        @KafkaListener(topics = "orderInfo", containerFactory = "stringKafkaListenerContainerFactory")
        public void receive2(ConsumerRecord consumerRecord,Acknowledgment acknowledgment) {
            SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            logger.info(String.format("time %s recived  message from topic %s  partition %d : %s",sim.format( new Date(consumerRecord.timestamp())),consumerRecord.topic(),consumerRecord.partition(), consumerRecord.value()));
            acknowledgment.acknowledge();
        }
    

    批量消费自动提交

    config

    @Configuration
    @EnableKafka
    public class BatchConsumerConfig {
    
        /**
         * 多线程-批量消费  自动提交位移
         */
       @Bean
        public KafkaListenerContainerFactory batchFactory() {
            Map<String, Object> configProps = new HashMap<>();
            configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "bk1:9092,bk2:9092,bk3:9092");
            configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "topic");
            // 批量消费消息数量
            configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
            // 自动提交偏移量
            // 如果设置成true,偏移量由auto.commit.interval.ms控制自动提交的频率
            // 如果设置成false,不需要定时的提交offset,可以自己控制offset,当消息认为已消费过了,这个时候再去提交它们的偏移量。
            // 这个很有用的,当消费的消息结合了一些处理逻辑,这个消息就不应该认为是已经消费的,直到它完成了整个处理。
            configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            // 自动提交的频率
            configProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            // Session超时设置
            configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
            // 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
            // latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
            // earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
            configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
            // 控制多线程消费,并发数(如果topic有3各分区。设置成3,并发数就是3个线程,加快消费), 不设置setConcurrency就会变成单线程配置, MAX_POLL_RECORDS_CONFIG也会失效,接收的消息列表也不会是ConsumerRecord
            factory.setConcurrency(10);
            // poll超时时间
            factory.getContainerProperties().setPollTimeout(1500);
            // 控制批量消费
            // 设置为批量消费,每个批次数量在Kafka配置参数中设置(max.poll.records)
            factory.setBatchListener(true);
            return factory;
        }
    }
    

    receiver

         //批量消息 自动提交
        @KafkaListener(topics = "orderInfo", containerFactory="batchFactory")
        public void consumerBatch1(List<ConsumerRecord<?, ?>> records){
            logger.info("接收到消息数量:{}",records.size());
            for(ConsumerRecord record: records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());
                logger.info("Received: " + record);
                if (kafkaMessage.isPresent()) {
                    Object message = record.value();
                    String topic = record.topic();
                    System.out.println("接收到消息:" + message);
                }
            }
        }
    

    批量消费手动提交

    config

        /**
         * 多线程-批量消费  手动提交位移
         */
        @Bean
        public KafkaListenerContainerFactory batchFactory() {
            Map<String, Object> configProps = new HashMap<>();
            configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "bk1:9092,bk2:9092,bk3:9092");
            configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "topic");
            // 批量消费消息数量
            configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
            // 自动提交偏移量
            // 如果设置成true,偏移量由auto.commit.interval.ms控制自动提交的频率
            // 如果设置成false,不需要定时的提交offset,可以自己控制offset,当消息认为已消费过了,这个时候再去提交它们的偏移量。
            // 这个很有用的,当消费的消息结合了一些处理逻辑,这个消息就不应该认为是已经消费的,直到它完成了整个处理。
            configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            // 自动提交的频率
            configProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            // Session超时设置
            configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
            // 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
            // latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
            // earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
            configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
            // 控制多线程消费,并发数(如果topic有3各分区。设置成3,并发数就是3个线程,加快消费), 不设置setConcurrency就会变成单线程配置, MAX_POLL_RECORDS_CONFIG也会失效,接收的消息列表也不会是ConsumerRecord
            factory.setConcurrency(10);
            // poll超时时间
            factory.getContainerProperties().setPollTimeout(1500);
            // 控制批量消费
            // 设置为批量消费,每个批次数量在Kafka配置参数中设置(max.poll.records)
            factory.setBatchListener(true);
    
    
            factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
            return factory;
        }
    

    receiver

        //批量消息 手动提交
        @KafkaListener(topics = "orderInfo", containerFactory="batchFactory")
        public void consumerBatch2(List<ConsumerRecord<?, ?>> records,Acknowledgment acknowledgment){
            logger.info("接收到消息数量:{}",records.size());
            for(ConsumerRecord record: records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());
                logger.info("Received: " + record);
                if (kafkaMessage.isPresent()) {
                    Object message = record.value();
                    String topic = record.topic();
                    System.out.println("接收到消息:" + message);
                }
            }
            acknowledgment.acknowledge();
        }
    
  • 相关阅读:
    字符,字节和编码
    Linux网络参数和ifconfig
    默认网关 网关 子网掩码 广播地址
    S.M.A.R.T.记录几块ssd硬盘
    linux 别名
    echo 输出颜色
    Linux:echo命令详解
    centos下安装mongodb 通过shell脚本
    linux查看登录用户
    linux wget指定下载目录和重命名
  • 原文地址:https://www.cnblogs.com/luckyhui28/p/12582060.html
Copyright © 2011-2022 走看看