zoukankan      html  css  js  c++  java
  • Springboot中使用kafka

    注:kafka消息队列默认采用配置消息主题进行消费,一个topic中的消息只能被同一个组(groupId)的消费者中的一个消费者消费。

    1.在pom.xml依赖下新添加一下kafka依赖ar包

    <!--kafka-->
    <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
       <version>1.1.1.RELEASE</version>
    </dependency>
    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
       <version>0.10.0.1</version>
    </dependency>


    2.在application.properties增加配置:

    #原始数据kafka读取
    kafka.consumer.servers=IP:9092,IP:9092(kafka消费集群ip+port端口)
    kafka.consumer.enable.auto.commit=true(是否自动提交)
    kafka.consumer.session.timeout=20000(连接超时时间)
    kafka.consumer.auto.commit.interval=100
    kafka.consumer.auto.offset.reset=latest(实时生产,实时消费,不会从头开始消费)
    kafka.consumer.topic=result(消费的topic)
    kafka.consumer.group.id=test(消费组)
    kafka.consumer.concurrency=10(设置消费线程数)
    
    #协议转换后存储kafka
    kafka.producer.servers=IP:9092,IP:9092(kafka生产集群ip+port端口)
    kafka.producer.topic=result(生产的topic)
    kafka.producer.retries=0
    kafka.producer.batch.size=4096
    kafka.producer.linger=1
    kafka.producer.buffer.memory=40960


    3.生产者配置类:

    package com.mapbar.track_storage.config;
    
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
    * kafka生产配置
    * @author Lvjiapeng
    *
    */
    @Configuration
    @EnableKafka
    public class KafkaProducerConfig {
    @Value("${kafka.producer.servers}")
    private String servers;
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;
    
    public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
    props.put(ProducerConfig.RETRIES_CONFIG, retries);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
    props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
    }
    
    public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<String, String>(producerFactory());
    }
    }
    View Code

    4.消费者配置类:

    package com.mapbar.track_storage.config;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
    * kafka消费者配置
    * @author Lvjiapeng
    *
    */
    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
    
    @Value("${kafka.consumer.servers}")
    private String servers;
    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.auto.commit.interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group.id}")
    private String groupId;
    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;
    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(concurrency);
    factory.getContainerProperties().setPollTimeout(1500);
    return factory;
    }
    
    public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    
    
    public Map<String, Object> consumerConfigs() {
    Map<String, Object> propsMap = new HashMap<>();
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
    propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
    propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
    propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
    propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    return propsMap;
    }
    /**
    * kafka监听
    * @return
    */
    @Bean
    public RawDataListener listener() {
    return new RawDataListener();
    }
    
    }
    View Code

    5.测试生产者:

    package com.mapbar.track_storage.controller;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    import java.io.IOException;
    
    @RequestMapping(value = "/kafka")
    @Controller
    public class ProducerController {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    @RequestMapping(value = "/producer",method = RequestMethod.GET)
    public void consume(HttpServletRequest request, HttpServletResponse response) throws IOException{
    String value = "{"code":200,"dataVersion":"17q1","message":"","id":"364f79f28eea48eefeca8c85477a10d3","source":"didi","tripList":[{"subTripList":[{"startTimeStamp":1519879598,"schemeList":[{"distance":0.0,"ids":"94666702,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519879598,"subTripId":0},{"startTimeStamp":1519879727,"schemeList":[{"distance":1395.0,"ids":"94666729,7298838,7291709,7291706,88613298,88613297,7297542,7297541,94698785,94698786,94698778,94698780,94698779,94698782,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519879812,"subTripId":1},{"startTimeStamp":1519879836,"schemeList":[{"distance":0.0,"ids":"54123007,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519879904,"subTripId":2},{"startTimeStamp":1519879959,"schemeList":[{"distance":0.0,"ids":"54190443,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519879959,"subTripId":3},{"startTimeStamp":1519880088,"schemeList":[{"distance":2885.0,"ids":"94698824,94698822,94698789,94698786,54123011,54123012,54123002,94698763,94698727,94698722,94698765,54123006,54123004,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519880300,"subTripId":4},{"startTimeStamp":1519880393,"schemeList":[{"distance":2398.0,"ids":"7309441,7303680,54123061,54123038,7309478,7309477,94698204,94698203,94698273,94698274,94698288,94698296,94698295,94698289,94698310,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519880636,"subTripId":5},{"startTimeStamp":1519881064,"schemeList":[{"distance":35.0,"ids":"7309474,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519881204,"subTripId":6},{"startTimeStamp":1519881204,"schemeList":[{"distance":28.0,"ids":"7309476,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519881266,"subTripId":7},{"startTimeStamp":1519881291,"schemeList":[{"distance":463.0,"ids":"7303683,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519881329,"subTripId":8}],"startTimeStamp":1519879350,"unUseTime":1201,"totalTime":2049,"endTimeStamp":1519881399,"tripId":0}]}";
    for (int i = 1; i<=500; i++){
    kafkaTemplate.send("result",value);
    }
    }
    }
    View Code

    6.测试消费者:

    import net.sf.json.JSONObject;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.log4j.Logger;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.List;
    
    /**
    * kafka监听
    * @author shangzz
    *
    */
    @Component
    public class RawDataListener {
    Logger logger=Logger.getLogger(RawDataListener.class);
    @Autowired
    private MatchRoadService matchRoadService;
    
    /**
    * 实时获取kafka数据(生产一条,监听生产topic自动消费一条)
    * @param record
    * @throws IOException
    */
    @KafkaListener(topics = {"${kafka.consumer.topic}"})
    public void listen(ConsumerRecord<?, ?> record) throws IOException {
    String value = (String) record.value();
    System.out.println(value);
    }
    
    }
    View Code


    总结:

             ①  生产者环境类配置好以后,@Autowired自动注入KafkaTemplate类,使用send方法生产消息

             ②  消费者环境类配置好以后,方法头前使用@KafkaListener(topics = {"${kafka.consumer.topic}"})注解监听topic并传入ConsumerRecord<?, ?> record对象即可自动消费topic

             ③  相关kafka配置只需在application.properties照葫芦画瓢添加,修改或者删除配置并在环境配置类中做出相应修改即可

    二:怎么实现让一个topic可以让不同group消费呢

    goupid不要用配置文件配置的方式,细心的话,会发现@KafkaListener 注解,里面有一个containerFactory参数,就是让你指定容器工厂的

    栗子:

    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    
    @Configuration
    public class KafkaConsumerConfig {
    
    private String brokers = "192.168.52.130:9092,192.168.52.131:9092,192.168.52.133:9092";
    
    private String group1 = "test1";
    private String group2 = "test2";
    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory1() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    factory.setConsumerFactory(consumerFactory1());
    factory.setConcurrency(4);
    factory.getContainerProperties().setPollTimeout(4000);
    return factory;
    }
    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory2() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    factory.setConsumerFactory(consumerFactory2());
    factory.setConcurrency(4);
    factory.getContainerProperties().setPollTimeout(4000);
    return factory;
    }
    
    public Map<String, Object> getCommonPropertis() {
    Map<String, Object> properties = new HashMap<String, Object>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, group1);
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    return properties;
    }
    
    
    public ConsumerFactory<String, String> consumerFactory1() {
    Map<String, Object> properties = getCommonPropertis();
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, group1);
    return new DefaultKafkaConsumerFactory<String, String>(properties);
    }
    
    public ConsumerFactory<String, String> consumerFactory2() {
    Map<String, Object> properties = getCommonPropertis();
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, group2);
    return new DefaultKafkaConsumerFactory<String, String>(properties);
    }
    }
    View Code

    最后,在@KafkaListener 中指定容器名称

    @KafkaListener(id="test1",topics = "test-topic", containerFactory="kafkaListenerContainerFactory1")
    @KafkaListener(id="test2",topics = "test-topic", containerFactory="kafkaListenerContainerFactory2")
    高版本 在@KafkaListener 注解中有groupId属性可以设置


    --------------------------------------------------------------------------------------------------
    转载:https://blog.csdn.net/lv_1093964643/article/details/83177280

  • 相关阅读:
    【整理】【代码的坏味道】过长函数(Long Method)
    【整理】【代码的坏味道】重复代码(Duplicated Code)
    【原创】Winform下拉框自动选择实现
    time及各种cpu时间
    arch安装及配置xfce4桌面
    paste工具
    十分有用的cut剪切命令
    ubuntu一些脚本的执行顺序
    Linux一些经典书籍
    强大的wget下载工具
  • 原文地址:https://www.cnblogs.com/lucas1024/p/9948067.html
Copyright © 2011-2022 走看看