zoukankan      html  css  js  c++  java
  • Springboot中使用kafka

    注:kafka消息队列默认采用配置消息主题进行消费,一个topic中的消息只能被同一个组(groupId)的消费者中的一个消费者消费。

    1.在pom.xml依赖下新添加一下kafka依赖ar包

    <!--kafka-->
    <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
       <version>1.1.1.RELEASE</version>
    </dependency>
    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
       <version>0.10.0.1</version>
    </dependency>


    2.在application.properties增加配置:

    #原始数据kafka读取
    kafka.consumer.servers=IP:9092,IP:9092(kafka消费集群ip+port端口)
    kafka.consumer.enable.auto.commit=true(是否自动提交)
    kafka.consumer.session.timeout=20000(连接超时时间)
    kafka.consumer.auto.commit.interval=100
    kafka.consumer.auto.offset.reset=latest(实时生产,实时消费,不会从头开始消费)
    kafka.consumer.topic=result(消费的topic)
    kafka.consumer.group.id=test(消费组)
    kafka.consumer.concurrency=10(设置消费线程数)
    
    #协议转换后存储kafka
    kafka.producer.servers=IP:9092,IP:9092(kafka生产集群ip+port端口)
    kafka.producer.topic=result(生产的topic)
    kafka.producer.retries=0
    kafka.producer.batch.size=4096
    kafka.producer.linger=1
    kafka.producer.buffer.memory=40960


    3.生产者配置类:

    package com.mapbar.track_storage.config;
    
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
    * kafka生产配置
    * @author Lvjiapeng
    *
    */
    @Configuration
    @EnableKafka
    public class KafkaProducerConfig {
    @Value("${kafka.producer.servers}")
    private String servers;
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;
    
    public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
    props.put(ProducerConfig.RETRIES_CONFIG, retries);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
    props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
    }
    
    public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<String, String>(producerFactory());
    }
    }
    View Code

    4.消费者配置类:

    package com.mapbar.track_storage.config;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
    * kafka消费者配置
    * @author Lvjiapeng
    *
    */
    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
    
    @Value("${kafka.consumer.servers}")
    private String servers;
    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.auto.commit.interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group.id}")
    private String groupId;
    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;
    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(concurrency);
    factory.getContainerProperties().setPollTimeout(1500);
    return factory;
    }
    
    public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    
    
    public Map<String, Object> consumerConfigs() {
    Map<String, Object> propsMap = new HashMap<>();
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
    propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
    propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
    propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
    propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    return propsMap;
    }
    /**
    * kafka监听
    * @return
    */
    @Bean
    public RawDataListener listener() {
    return new RawDataListener();
    }
    
    }
    View Code

    5.测试生产者:

    package com.mapbar.track_storage.controller;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    import java.io.IOException;
    
    @RequestMapping(value = "/kafka")
    @Controller
    public class ProducerController {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    @RequestMapping(value = "/producer",method = RequestMethod.GET)
    public void consume(HttpServletRequest request, HttpServletResponse response) throws IOException{
    String value = "{"code":200,"dataVersion":"17q1","message":"","id":"364f79f28eea48eefeca8c85477a10d3","source":"didi","tripList":[{"subTripList":[{"startTimeStamp":1519879598,"schemeList":[{"distance":0.0,"ids":"94666702,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519879598,"subTripId":0},{"startTimeStamp":1519879727,"schemeList":[{"distance":1395.0,"ids":"94666729,7298838,7291709,7291706,88613298,88613297,7297542,7297541,94698785,94698786,94698778,94698780,94698779,94698782,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519879812,"subTripId":1},{"startTimeStamp":1519879836,"schemeList":[{"distance":0.0,"ids":"54123007,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519879904,"subTripId":2},{"startTimeStamp":1519879959,"schemeList":[{"distance":0.0,"ids":"54190443,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519879959,"subTripId":3},{"startTimeStamp":1519880088,"schemeList":[{"distance":2885.0,"ids":"94698824,94698822,94698789,94698786,54123011,54123012,54123002,94698763,94698727,94698722,94698765,54123006,54123004,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519880300,"subTripId":4},{"startTimeStamp":1519880393,"schemeList":[{"distance":2398.0,"ids":"7309441,7303680,54123061,54123038,7309478,7309477,94698204,94698203,94698273,94698274,94698288,94698296,94698295,94698289,94698310,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519880636,"subTripId":5},{"startTimeStamp":1519881064,"schemeList":[{"distance":35.0,"ids":"7309474,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519881204,"subTripId":6},{"startTimeStamp":1519881204,"schemeList":[{"distance":28.0,"ids":"7309476,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519881266,"subTripId":7},{"startTimeStamp":1519881291,"schemeList":[{"distance":463.0,"ids":"7303683,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519881329,"subTripId":8}],"startTimeStamp":1519879350,"unUseTime":1201,"totalTime":2049,"endTimeStamp":1519881399,"tripId":0}]}";
    for (int i = 1; i<=500; i++){
    kafkaTemplate.send("result",value);
    }
    }
    }
    View Code

    6.测试消费者:

    import net.sf.json.JSONObject;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.log4j.Logger;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.List;
    
    /**
    * kafka监听
    * @author shangzz
    *
    */
    @Component
    public class RawDataListener {
    Logger logger=Logger.getLogger(RawDataListener.class);
    @Autowired
    private MatchRoadService matchRoadService;
    
    /**
    * 实时获取kafka数据(生产一条,监听生产topic自动消费一条)
    * @param record
    * @throws IOException
    */
    @KafkaListener(topics = {"${kafka.consumer.topic}"})
    public void listen(ConsumerRecord<?, ?> record) throws IOException {
    String value = (String) record.value();
    System.out.println(value);
    }
    
    }
    View Code


    总结:

             ①  生产者环境类配置好以后,@Autowired自动注入KafkaTemplate类,使用send方法生产消息

             ②  消费者环境类配置好以后,方法头前使用@KafkaListener(topics = {"${kafka.consumer.topic}"})注解监听topic并传入ConsumerRecord<?, ?> record对象即可自动消费topic

             ③  相关kafka配置只需在application.properties照葫芦画瓢添加,修改或者删除配置并在环境配置类中做出相应修改即可

    二:怎么实现让一个topic可以让不同group消费呢

    goupid不要用配置文件配置的方式,细心的话,会发现@KafkaListener 注解,里面有一个containerFactory参数,就是让你指定容器工厂的

    栗子:

    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    
    @Configuration
    public class KafkaConsumerConfig {
    
    private String brokers = "192.168.52.130:9092,192.168.52.131:9092,192.168.52.133:9092";
    
    private String group1 = "test1";
    private String group2 = "test2";
    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory1() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    factory.setConsumerFactory(consumerFactory1());
    factory.setConcurrency(4);
    factory.getContainerProperties().setPollTimeout(4000);
    return factory;
    }
    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory2() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    factory.setConsumerFactory(consumerFactory2());
    factory.setConcurrency(4);
    factory.getContainerProperties().setPollTimeout(4000);
    return factory;
    }
    
    public Map<String, Object> getCommonPropertis() {
    Map<String, Object> properties = new HashMap<String, Object>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, group1);
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    return properties;
    }
    
    
    public ConsumerFactory<String, String> consumerFactory1() {
    Map<String, Object> properties = getCommonPropertis();
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, group1);
    return new DefaultKafkaConsumerFactory<String, String>(properties);
    }
    
    public ConsumerFactory<String, String> consumerFactory2() {
    Map<String, Object> properties = getCommonPropertis();
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, group2);
    return new DefaultKafkaConsumerFactory<String, String>(properties);
    }
    }
    View Code

    最后,在@KafkaListener 中指定容器名称

    @KafkaListener(id="test1",topics = "test-topic", containerFactory="kafkaListenerContainerFactory1")
    @KafkaListener(id="test2",topics = "test-topic", containerFactory="kafkaListenerContainerFactory2")
    高版本 在@KafkaListener 注解中有groupId属性可以设置


    --------------------------------------------------------------------------------------------------
    转载:https://blog.csdn.net/lv_1093964643/article/details/83177280

  • 相关阅读:
    软件体系架构复习要点
    Operating System on Raspberry Pi 3b
    2019-2020 ICPC North-Western Russia Regional Contest
    2019 ICPC ShenYang Regional Online Contest
    2019 ICPC XuZhou Regional Online Contest
    2019 ICPC NanChang Regional Online Contest
    2019 ICPC NanJing Regional Online Contest
    Codeforces Edu Round 72 (Rated for Div. 2)
    Codeforces Round #583 (Div.1+Div.2)
    AtCoder Beginning Contest 139
  • 原文地址:https://www.cnblogs.com/lucas1024/p/9948067.html
Copyright © 2011-2022 走看看