zoukankan      html  css  js  c++  java
  • kafka consumerFactory 配置多个

    1.消费者配置
    一个kafka项目中,消费者配置可以存在多份,consumerFactory可以写多个,同时对应新写consumerListener来引用新加的配置

    package com.wing.springbootkafka.config;
    
    import com.wing.springbootkafka.listener.Listener;
    import com.wing.springbootkafka.listener.Listener2;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.ByteArrayDeserializer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
    
        @Value("${kafka.consumer.servers}")
        private String servers;
        @Value("${kafka.consumer.enable.auto.commit}")
        private boolean enableAutoCommit;
        @Value("${kafka.consumer.session.timeout}")
        private String sessionTimeout;
        @Value("${kafka.consumer.auto.commit.interval}")
        private String autoCommitInterval;
        @Value("${kafka.consumer.group.id}")
        private String groupId;
        @Value("${kafka.consumer.auto.offset.reset}")
        private String autoOffsetReset;
        @Value("${kafka.consumer.concurrency}")
        private int concurrency;
    
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(concurrency);
            factory.getContainerProperties().setPollTimeout(1500);
            return factory;
        }
    
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory2() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new
                    ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory2());
            factory.setConcurrency(concurrency);
            factory.getContainerProperties().setPollTimeout(1500);
            return factory;
        }
    
        public ConsumerFactory<String, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        public ConsumerFactory<String, String> consumerFactory2() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs2());
        }
    
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> propsMap = new HashMap<>();
            propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
            propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
            propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
            propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            return propsMap;
        }
    
        public Map<String, Object> consumerConfigs2() {
            Map<String, Object> propsMap = new HashMap<>();
            propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
            propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
            propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
            propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
            propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
            propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            return propsMap;
        }
    
        @Bean
        public Listener getListener(){
            return new Listener();
        }
    
        @Bean
        public Listener2 getListener2(){
            return new Listener2();
        }
    }
    

    2.监听代码

    package com.wing.springbootkafka.listener;
    import com.wing.springbootkafka.modle.LocationReport;
    import com.wing.springbootkafka.utils.JsonUtil;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.annotation.KafkaListener;
    import java.io.UnsupportedEncodingException;
    import java.util.Optional;
    /**
     * @ClassName Listener2
     * @Description 测试支持两个配置
     * @createTime 2019年04月04日 16:25:00
     */
    public class Listener2 {
        protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    
        @KafkaListener(topics = {"test2"}, containerFactory = "kafkaListenerContainerFactory2")
        public void listen(ConsumerRecord<String, byte[]> record) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            logger.info(">>>>>>>>>> record =" + kafkaMessage);
            if(kafkaMessage.isPresent()){
                //得到Optional实例中的值
                Object message = kafkaMessage.get();
                logger.info("kafka的key2: " + record.key());
                logger.info("kafka的value2: " + record.value());
                try {
                    String json = new String(record.value(), "UTF-8");
                    LocationReport report = JsonUtil.fromJson(json, LocationReport.class);
                    logger.info("{}", report.toString());
                } catch (UnsupportedEncodingException e) {
                    logger.error("UnsupportedEncodingException error={}", e);
                }
            }
        }
    }
    
    
  • 相关阅读:
    react中this.setState的理解
    expo:wraning remotedebugger is in a...cause apps to perform slowly
    expo:java.net.socketExcrption:No route to host
    redux的中间件
    js中this的指向
    微信小程序之模板/组件的使用
    js判断手机端
    微信小程序轮播图
    scrollbar样式设置
    CSS绝对定位元素居中的几种方法
  • 原文地址:https://www.cnblogs.com/yaozhixiang/p/15099207.html
Copyright © 2011-2022 走看看