zoukankan      html  css  js  c++  java
  • springboot系列八、springboot整合kafka

    背景:

      当业务在同一时间出现高并发的时候,这个时候我们不想无限的增加服务器,但是又想提高吞吐量。这时可以考虑使用消息异步处理,进行消峰填谷;同时还可以降低耦合度。常见的消息中间件有kafka,rabbitMQ,activeMQ,rocketMQ。其中性能最好的,吞吐量最高的是以kafka为代表,下面介绍kafka用法。kafka详细原理介绍,参考kafka系列:https://www.cnblogs.com/wangzhuxing/category/1351802.html。

    一、引入依赖

    <!--kafka支持-->
    <dependency>
         <groupId>org.springframework.kafka</groupId>
         <artifactId>spring-kafka</artifactId>
    </dependency>

    二、配置yml

    spring:
       kafka:     # 指定kafka 代理地址,可以多个
          bootstrap-servers: 47.52.199.52:9092
          template:    # 指定默认topic id
            default-topic: producer
          listener:   # 指定listener 容器中的线程数,用于提高并发量
            concurrency: 5
          consumer:
            group-id: myGroup # 指定默认消费者group id
            client-id: 200
            max-poll-records: 200
            auto-offset-reset: earliest # 最早未被消费的offset
          producer:
            batch-size: 1000 # 每次批量发送消息的数量
            retries: 3
            client-id: 200

    三、生成者使用示例

    package com.example.demo.kafka;
    
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Component;
    import org.springframework.util.concurrent.ListenableFuture;
    
    import java.util.concurrent.ExecutionException;
    
    @Component
    public class Producer {
        @Autowired
        private KafkaTemplate<String,String> kafkaTemplate;
    
        /**
         * 发送消息到kafka
         */
        public RecordMetadata sendChannelMess(String topic, String message) {
            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic,message);
            RecordMetadata recordMetadata = null;
            try {
                recordMetadata = future.get().getRecordMetadata();
            } catch (InterruptedException|ExecutionException e) {
                e.printStackTrace();
                System.out.println("发送失败");
            }
            System.out.println("发送成功");
            System.out.println("partition:"+recordMetadata.partition());
            System.out.println("offset:"+recordMetadata.offset());
            System.out.println("topic:"+recordMetadata.topic());
    
            return recordMetadata;
        }
    }

    四、消费者使用示例

    package com.example.demo.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    import java.util.List;
    
    @Component
    public class Consumer {
    
        /**
         * 有消息就读取,只读取消息value
         */
        @KafkaListener(topics = {"test13"})
        public void receiveMessage(String message){
            //收到通道的消息之后执行秒杀操作
            System.out.println(message);
        }
    
        /**
         * 有消息就读取,批量读取消息value
         */
        @KafkaListener(topics = "test12")
        public void onMessage(List<String> crs) {
            for(String str : crs){
                System.out.println("test12:" + str);
            }
        }
    
        /**
         * 有消息就读取,读取消息topic,offset,key,value等信息
         */
        @KafkaListener(topics = "test14")
        public void listenT1(ConsumerRecord<?, ?> cr){
            System.out.println("listenT1收到消息,topic:>>>" + cr.topic() + "  offset:>>" + cr.offset()+ "  key:>>" + cr.key() + "  value:>>" + cr.value());
        }
    }
  • 相关阅读:
    Dual Quaternion Knowledge Graph Embeddings——对偶四元数知识图谱嵌入
    Python argparse模块
    Pythonvirtualenv创建虚拟环境
    KBGAN:用于知识图谱嵌入的对抗学习
    anaconda简单使用
    二分查找详解
    Quaternion Knowledge Graph Embeddings —— 基于四元数的知识图谱嵌入
    ConvR——用于多关系学习的自适应卷积模型
    Under Any Linux: install bypy tool
    PRUNE_BIND_MOUNTS="yes"不好用, 什么原因呢?
  • 原文地址:https://www.cnblogs.com/wangzhuxing/p/10186666.html
Copyright © 2011-2022 走看看