zoukankan      html  css  js  c++  java
  • Spring Boot系列 八、集成Kafka

    一、引入依赖

     <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>2.4.0.RELEASE</version>
            </dependency>

    二、编写配置文件

    spring:
      application: kafka
      kafka:
        bootstrap-servers: ip:9092
        producer:
          retries: 1
          batch-size: 16384
          buffer-memory: 1024000
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          enable-auto-commit: true
          auto-commit-interval: 10
          auto-offset-reset: earliest
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    三、注入生产者

    @Configuration
    public class ProductorConfiugutration {
    
        @Value("${spring.kafka.bootstrap-servers}")
        private String server;
        @Value("${spring.kafka.producer.buffer-memory}")
        private Integer bufferMemory;
        @Value("${spring.kafka.producer.batch-size}")
        private Integer batchSize;
        @Value("${spring.kafka.producer.retries}")
        private int retries;
        @Value("${spring.kafka.producer.key-serializer}")
        private String keySerializer;
        @Value("${spring.kafka.producer.value-serializer}")
        private  String valueSerlializer;
    
        @Bean("peopleTemplate")
        public KafkaTemplate<String, String> getPeopleTemplate() {
            KafkaTemplate<String, String> peopleTemple = new KafkaTemplate<String, String>(getFactory());
            peopleTemple.setDefaultTopic("topic.people");
            return peopleTemple;
        }
    
        public ProducerFactory getFactory() {
            return new DefaultKafkaProducerFactory(getProductorConfig());
        }
    
        public Map<String, Object> getProductorConfig() {
            Map<String, Object> hashMap = new HashMap<String, Object>();
            hashMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
            hashMap.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
            hashMap.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
            hashMap.put(ProducerConfig.RETRIES_CONFIG, retries);
            hashMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,keySerializer);
            hashMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerlializer);
            return hashMap;
        }
    }

    四、注入消费者

    @Configuration
    public class ConsumerConfiguration {
    
        @Value("${spring.kafka.consumer.enable-auto-commit}")
        private boolean autoCommit;
        @Value("${spring.kafka.consumer.auto-commit-interval}")
        private String interval;
        @Value("${spring.kafka.consumer.auto-offset-reset}")
        private String rest;
        @Value("${spring.kafka.consumer.key-deserializer}")
        private String keyDeserializer;
        @Value("${spring.kafka.consumer.value-deserializer}")
        private String valueDeserializer;
        @Value("${spring.kafka.bootstrap-servers}")
        private String server;
    
    
        @Bean("userConsumer")
        public  KafkaConsumer<String,String> getUserConsumer(){
            List<String> topic=new ArrayList<String>();
            topic.add("topic.people");
            KafkaConsumer<String,String> userConsumner=new KafkaConsumer<String, String>(getConfig());
            userConsumner.subscribe(topic);
            return  userConsumner;
        }
    
    
        public ConsumerFactory<String,String> getFaactory() {
            return new DefaultKafkaConsumerFactory(getConfig());
        }
    
        public Map<String, Object> getConfig() {
            Map<String, Object> hashMap = new HashMap<String, Object>();
            hashMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
            hashMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, rest);
            hashMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, interval);
            hashMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
            hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
            hashMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
            hashMap.put(ConsumerConfig.GROUP_ID_CONFIG,"2");
            return hashMap;
        }
    }

    五、发送消息和消费消息

    @RestController
    public class KafkaController {
    
        @Resource
        KafkaTemplate<String,String> peopleTemplate;
    
        @Resource
        KafkaConsumer<String,String> userConsumer;
    
    
        @RequestMapping("/send")
        public  String sendMessage(){
    
            People people = new People();
            people.setAddress("北京通州");
            people.setAge(24);
            people.setName("小白");
            people.setSex(1);
            String jsonString = JSON.toJSONString(people);
            peopleTemplate.sendDefault(jsonString);
    
    
            return  "发送成功";
        }
    
        @RequestMapping("getmesssage")
        public  String getMessage(){
    
            ConsumerRecords<String, String> message= userConsumer.poll(100);
    
            for(ConsumerRecord<String,String> msg:message){
                System.out.println(msg.value());
            }
    
    
            return  "获取消息";
        }
    }

    结果

     本文简单粗暴,爱看不看

  • 相关阅读:
    获取移动端屏幕的宽度的方法
    document.body 与 document.documentElement区别介绍
    鼠标放上去会变化
    获取当前日期的函数
    侧边栏和右边高度的问题
    【Ajax】接收后台数据在html页面显示
    JS获取屏幕的大小
    python——字符串的方法及注释&字符串格式化符号含义及转义字符含义
    python——元组
    python——列表操作符
  • 原文地址:https://www.cnblogs.com/Tassdar/p/12158026.html
Copyright © 2011-2022 走看看