zoukankan      html  css  js  c++  java
  • SpringBoot接入两套kafka集群

    引入依赖

      compile 'org.springframework.kafka:spring-kafka'
    

    第一套kafka配置

    package myapp.kafka;
    
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.*;
    import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 默认的kafka配置
     *
     * @author zhengqian
     */
    @Slf4j
    @Configuration
    @Data
    public class K1KafkaConfiguration {
    
        @Value("${app-name.kafka.k1.consumer.bootstrap-servers}")
        private String consumerBootstrapServers;
    
        @Value("${app-name.kafka.k1.consumer.group-id}")
        private String groupId;
    
        @Value("${app-name.kafka.k1.consumer.auto-offset-reset}")
        private String autoOffsetReset;
    
        @Value("${app-name.kafka.k1.consumer.enable-auto-commit}")
        private Boolean enableAutoCommit;
    
        @Value("${app-name.kafka.k2.producer.bootstrap-servers}")
        private String producerBootstrapServers;
    
        @Bean
        @Primary
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(3);
            factory.getContainerProperties().setPollTimeout(3000);
            return factory;
        }
    
        @Bean
        public ConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return props;
        }
    
        @Bean
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
        @Bean
        public ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    }
    
    

    第二套kafka配置

    package myapp.kafka;
    
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.*;
    import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 默认的kafka配置
     *
     * @author zhengqian
     */
    @Slf4j
    @Configuration
    @Data
    public class K2KafkaConfiguration {
    
        @Value("${app-name.kafka.k2.consumer.bootstrap-servers}")
        private String consumerBootstrapServers;
    
        @Value("${app-name.kafka.k2.consumer.group-id}")
        private String groupId;
    
        @Value("${app-name.kafka.k2.consumer.auto-offset-reset}")
        private String autoOffsetReset;
    
        @Value("${app-name.kafka.k2.consumer.enable-auto-commit}")
        private Boolean enableAutoCommit;
    
        @Value("${app-name.kafka.k2.producer.bootstrap-servers}")
        private String producerBootstrapServers;
    
        @Bean
        @Primary
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryK2() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactoryK2());
            factory.setConcurrency(3);
            factory.getContainerProperties().setPollTimeout(3000);
            return factory;
        }
    
        @Bean
        public ConsumerFactory<Integer, String> consumerFactoryK2() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigsK2());
        }
    
        @Bean
        public Map<String, Object> consumerConfigsK2() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return props;
        }
    
        @Bean
        public Map<String, Object> producerConfigsK2() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
        @Bean
        public ProducerFactory<String, String> producerFactoryK2() {
            return new DefaultKafkaProducerFactory<>(producerConfigsK2());
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplateK2() {
            return new KafkaTemplate<>(producerFactoryK2());
        }
    }
    

    配置文件

    app-name: 
      kafka:
        k1:
          consumer:
            bootstrap-servers: host1:9092
            group-id: my-app
            auto-offset-reset: earliest
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            enable-auto-commit: true
          producer:
            bootstrap-servers: host1:9092
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
        k2:
          consumer:
            bootstrap-servers: host2:9092
            group-id: my-app
            auto-offset-reset: earliest
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            enable-auto-commit: true
          producer:
            bootstrap-servers: host2:9092
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
    

    指定消费的kafka集群

        @KafkaListener(topics = "topic-name", containerFactory = "kafkaListenerContainerFactoryK2")
        public void onEvent(ConsumerRecord<String, String> record) {
            // 省略
        }
    

    指定生产者发生的kafka集群

    public class KafkaTest {
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @Test
        public void test() {
            ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("topic", "data");
            try {
                SendResult<String, String> value = result.get(2, TimeUnit.SECONDS);
                System.out.println(value.getProducerRecord());
                System.out.println(value.getRecordMetadata());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
  • 相关阅读:
    Java实现 洛谷 P1049 装箱问题
    (Java实现) 洛谷 P1781 宇宙总统
    (Java实现) 洛谷 P1319 压缩技术
    (Java实现) 蓝桥杯 国赛 重复模式
    qt编写一个只能运行单个实例的程序,不用Windows API
    Chaos Software Google Sync v10.1.1.0 和Syncovery Pro
    C++中new和delete的背后( call edx 调用虚表内的第二个函数(析构函数))
    C++中实现回调机制的几种方式(一共三种方法,另加三种)
    如何将Icon转成Bitmap(对ICON的内部格式讲的比较清楚)
    深入解析控制器运行原理
  • 原文地址:https://www.cnblogs.com/ylty/p/13673357.html
Copyright © 2011-2022 走看看