zoukankan      html  css  js  c++  java
  • springboot kafka集成(实现producer和consumer)

    本文介绍如何在springboot项目中集成kafka收发message。

    1、先解决依赖

    springboot相关的依赖我们就不提了,和kafka相关的只依赖一个spring-kafka集成包

    <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>1.1.1.RELEASE</version>
            </dependency>

     这里我们先把配置文件展示一下

    #============== kafka ===================
    kafka.consumer.zookeeper.connect=10.93.21.21:2181
    kafka.consumer.servers=10.93.21.21:9092
    kafka.consumer.enable.auto.commit=true
    kafka.consumer.session.timeout=6000
    kafka.consumer.auto.commit.interval=100
    kafka.consumer.auto.offset.reset=latest
    kafka.consumer.topic=test
    kafka.consumer.group.id=test
    kafka.consumer.concurrency=10
    
    kafka.producer.servers=10.93.21.21:9092
    kafka.producer.retries=0
    kafka.producer.batch.size=4096
    kafka.producer.linger=1
    kafka.producer.buffer.memory=40960

    2、Configuration:Kafka producer 

    1)通过@Configuration、@EnableKafka,声明Config并且打开KafkaTemplate能力。

    2)通过@Value注入application.properties配置文件中的kafka配置。

    3)生成bean,@Bean

    package com.kangaroo.sentinel.collect.configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    
    @Configuration
    @EnableKafka
    public class KafkaProducerConfig {
    
        @Value("${kafka.producer.servers}")
        private String servers;
        @Value("${kafka.producer.retries}")
        private int retries;
        @Value("${kafka.producer.batch.size}")
        private int batchSize;
        @Value("${kafka.producer.linger}")
        private int linger;
        @Value("${kafka.producer.buffer.memory}")
        private int bufferMemory;
    
    
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            props.put(ProducerConfig.RETRIES_CONFIG, retries);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
            props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
        public ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<String, String>(producerFactory());
        }
    }

    实验我们的producer,写一个Controller。想topic=test,key=key,发送消息message

    package com.kangaroo.sentinel.collect.controller;
    
    import com.kangaroo.sentinel.common.response.Response;
    import com.kangaroo.sentinel.common.response.ResultCode;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.*;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    
    @RestController
    @RequestMapping("/kafka")
    public class CollectController {
        protected final Logger logger = LoggerFactory.getLogger(this.getClass());
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        @RequestMapping(value = "/send", method = RequestMethod.GET)
        public Response sendKafka(HttpServletRequest request, HttpServletResponse response) {
            try {
                String message = request.getParameter("message");
                logger.info("kafka的消息={}", message);
                kafkaTemplate.send("test", "key", message);
                logger.info("发送kafka成功.");
                return new Response(ResultCode.SUCCESS, "发送kafka成功", null);
            } catch (Exception e) {
                logger.error("发送kafka失败", e);
                return new Response(ResultCode.EXCEPTION, "发送kafka失败", null);
            }
        }
    
    }

    3、configuration:kafka consumer

    1)通过@Configuration、@EnableKafka,声明Config并且打开KafkaTemplate能力。

    2)通过@Value注入application.properties配置文件中的kafka配置。

    3)生成bean,@Bean

    package com.kangaroo.sentinel.collect.configuration;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
    
        @Value("${kafka.consumer.servers}")
        private String servers;
        @Value("${kafka.consumer.enable.auto.commit}")
        private boolean enableAutoCommit;
        @Value("${kafka.consumer.session.timeout}")
        private String sessionTimeout;
        @Value("${kafka.consumer.auto.commit.interval}")
        private String autoCommitInterval;
        @Value("${kafka.consumer.group.id}")
        private String groupId;
        @Value("${kafka.consumer.auto.offset.reset}")
        private String autoOffsetReset;
        @Value("${kafka.consumer.concurrency}")
        private int concurrency;
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(concurrency);
            factory.getContainerProperties().setPollTimeout(1500);
            return factory;
        }
    
        public ConsumerFactory<String, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
    
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> propsMap = new HashMap<>();
            propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
            propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
            propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
            propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            return propsMap;
        }
    
        @Bean
        public Listener listener() {
            return new Listener();
        }
    
    }

    new Listener()生成一个bean用来处理从kafka读取的数据。Listener简单的实现demo如下:只是简单的读取并打印key和message值

    @KafkaListener中topics属性用于指定kafka topic名称,topic名称由消息生产者指定,也就是由kafkaTemplate在发送消息时指定。

    package com.kangaroo.sentinel.collect.configuration;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.annotation.KafkaListener;
    
    public class Listener {
        protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    
    
        @KafkaListener(topics = {"test"})
        public void listen(ConsumerRecord<?, ?> record) {
            logger.info("kafka的key: " + record.key());
            logger.info("kafka的value: " + record.value().toString());
        }
    }

    tips:

    1)我没有介绍如何安装配置kafka,配置kafka时最好用完全bind网络ip的方式,而不是localhost或者127.0.0.1

    2)最好不要使用kafka自带的zookeeper部署kafka,可能导致访问不通。

    3)理论上consumer读取kafka应该是通过zookeeper,但是这里我们用的是kafkaserver的地址,为什么没有深究。

    4)定义监听消息配置时,GROUP_ID_CONFIG配置项的值用于指定消费者组的名称,如果同组中存在多个监听器对象则只有一个监听器对象能收到消息。

  • 相关阅读:
    常用知识点集合
    LeetCode 66 Plus One
    LeetCode 88 Merge Sorted Array
    LeetCode 27 Remove Element
    LeetCode 26 Remove Duplicates from Sorted Array
    LeetCode 448 Find All Numbers Disappeared in an Array
    LeetCode 219 Contains Duplicate II
    LeetCode 118 Pascal's Triangle
    LeetCode 119 Pascal's Triangle II
    LeetCode 1 Two Sum
  • 原文地址:https://www.cnblogs.com/kangoroo/p/7353330.html
Copyright © 2011-2022 走看看