zoukankan      html  css  js  c++  java
  • SpringBoot2.x下RabbitMQ的并发参数(concurrency和prefetch)

     

    RabbitMQ消费端配置

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
        listener:
          simple:
    #        acknowledge-mode: manual  # 手动确定(默认自动确认)
            concurrency: 1 # 消费端的监听个数(即@RabbitListener开启几个线程去处理数据。)
            max-concurrency: 10 # 消费端的监听最大个数
            prefetch: 10
        connection-timeout: 15000   # 超时时间
    

    在消费端,配置prefetchconcurrency参数便可以实现消费端MQ并发处理消息,那么这两个参数到底有什么含义??

    1. prefetch

    每个customer会在MQ预取一些消息放入内存的LinkedBlockingQueue中,这个值越高,消息传递的越快,但非顺序处理消息的风险更高。如果ack模式为none,则忽略。如有必要,将增加此值以匹配txSize或messagePerAck。从2.0开始默认为250;设置为1将还原为以前的行为。

    prefetch默认值以前是1,这可能会导致高效使用者的利用率不足。从spring-amqp 2.0版开始,默认的prefetch值是250,这将使消费者在大多数常见场景中保持忙碌,从而提高吞吐量。

    不过在有些情况下,尤其是处理速度比较慢的大消息,消息可能在内存中大量堆积,消耗大量内存;以及对于一些严格要求顺序的消息,prefetch的值应当设置为1。

    对于低容量消息和多个消费者的情况(也包括单listener容器的concurrency配置)希望在多个使用者之间实现更均匀的消息分布,建议在手动ack下并设置prefetch=1

    模拟:
    生产者每次生产10条消息:

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
        publisher-confirms: true
        publisher-returns: true
    
    @RestController
    public class RabbitMQController {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        //直接向队列中发送数据
        @GetMapping("send")
        public String send() {
            for (int i = 0; i < 10; i++) {
                String content = "Date:" + System.currentTimeMillis();
                content = content + ":::" + i;
                rabbitTemplate.convertAndSend("kinson", content);
            }
            return "success";
        }
    }
    

    控制页面:


     
    消费端直接预取了10条消息.png

    2. concurrency

    上面配置中,concurrency =1,即每个Listener容器将开启一个线程去处理消息。

    在2.0版本后,可以在注解中配置该参数:

    @Component
    @Slf4j
    public class CustomerRev {
      //会覆盖配置文件中的参数。
        @RabbitListener(queues = {"kinson"},concurrency =   "2")
        public void receiver(Message msg, Channel channel) throws InterruptedException {
    
    //        Thread.sleep(10000);
            byte[] messageBytes = msg.getBody();
    
            if (messageBytes != null && messageBytes.length > 0) {
                //打印数据
                String message = new String(msg.getBody(), StandardCharsets.UTF_8);
                log.info("【消3】:{}", message);
            }
        }
    
    }
    

    启动服务:

     
    可以看到MQ有两个消费者.png

    即该Listener容器产生了两个线程去消费queue。如果在Listener配置了exclusive参数,即确定此容器中的单个customer是否具有对队列的独占访问权限。如果为true,则容器的并发性必须为1。

     




  • 相关阅读:
    最新Navicat Premium12 破解方法,亲测可用
    (转)Navicat_12安装与破解激活,注册机亲测可用
    使用ApiPost模拟发送get、post、delete、put等http请求
    模拟POST、Get 请求的工具----APIpost(中文版POSTMAN)
    推荐一款接口文档生成工具,apipost,好用
    作为后端开发者,如何更优雅、便捷的生成接口文档?
    使用apipost调试api接口并快速生成接口文档的一些小技巧,比postman更好用
    中文版postman——apipost,不试一下,你就不知道它有多香
    ApiPost如何在预执行脚本里添加请求参数?
    ApiPost的预执行脚本和后执行脚本
  • 原文地址:https://www.cnblogs.com/liukunjava/p/13163951.html
Copyright © 2011-2022 走看看