zoukankan      html  css  js  c++  java
  • Spring Boot Kafka

    Kafka的核心概念,以及如何保证高可用性、可靠性、顺序性等问题这里就不赘述了,本文只记录操作示例。

    依赖配置

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    

    application.yml

    spring:
      kafka:
        bootstrap-servers: localhost:9092
        consumer:
          group-id: message # consumer group 消费者群组
          auto-offset-reset: latest # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    

    KafkaProducerConfig

    存储Java类MessageForm对象,需要自行替换

    @Configuration
    public class KafkaProducerConfig {
    
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
        
    
        @Bean
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    
            return props;
        }
    
        @Bean
        public ProducerFactory<String, MessageForm> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @Bean
        public KafkaTemplate<String, MessageForm> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    }
    

    KafkaConsumerConfig

    配置类上需要@EnableKafka注释才能在Spring托管Bean上检测@KafkaListener注解。

    @EnableKafka
    @Configuration
    public class KafkaConsumerConfig {
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    
            return props;
        }
    
        @Bean
        public ConsumerFactory<String, MessageForm> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                    new JsonDeserializer<>(MessageForm.class));
        }
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, MessageForm> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, MessageForm> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
    
            return factory;
        }
    }
    

    KafkaMessageListener

    在KafkaConsumerConfig中我们配置了ConsumerFactory和KafkaListenerContainerFactory。当这两个Bean成功注册到Spring IOC容器中后,我们便可以使用@KafkaListener注解来监听消息了。

    @Component
    public class KafkaMessageListener {
    
        @Resource
        private MessageService messageServiceImpl;
    
        @KafkaListener(topics="message",groupId = "consumer")
        public void listen(MessageForm messageForm){
            messageServiceImpl.message(messageForm);
        }
    }
    
  • 相关阅读:
    文件和数组的排序
    批量删除文档中的注释和空行
    strcat()的编写
    OpenGL鼠标旋转图像
    c++对文件操作的支持(二)
    汉字的16进制存储
    启动程序的c++方法
    HDU 2199 Can you solve this equation? 二分
    HDU 2899 Strange fuction 二分
    HDU 1233 还是畅通工程 最小生成树
  • 原文地址:https://www.cnblogs.com/zenan/p/10893640.html
Copyright © 2011-2022 走看看