zoukankan      html  css  js  c++  java
  • RabbitMQ入门-竞争消费者模式

    上一篇讲了个 哈喽World,现在来看看如果存在多个消费者的情况。

    生产者:

    package com.example.demo;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 竞争消费者模式
     */
    public class CompetingSend {
    
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();    // 连接工厂
            factory.setHost("localhost");
            Connection connection = factory.newConnection();        // 获取连接
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);    // 声明队列,只有他不存在的时候创建
            String msg = "Hello World!";
            // 发送多条消息
            for (int i = 0; i < 5; i++){
                channel.basicPublish("", QUEUE_NAME, null, (msg + "-" + i).getBytes());
                System.out.println("Sending:" + msg);
            }
    
            channel.close();
            connection.close();
    
        }
    }

    消费者:

    package com.example.demo;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 一个生产者,多个消费者
     */
    public class CompetingReceiveA {
    
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();    // 连接工厂
            factory.setHost("localhost");
            Connection connection = factory.newConnection();        // 获取连接
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);    // 声明队列,只有他不存在的时候创建
    
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String recv = new String(body, "UTF-8");
                    System.out.println("Receive:" + recv);
                    try {
                        doWork(recv);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("Done");
                    }
                }
            };
    
            // true代表接收到消息后,给兔子发消息,让这条消息失效
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    
        // 模拟每条消息处理时间不一样
        private static void doWork(String msg) throws InterruptedException {
            char c = msg.charAt(msg.length() - 1);
            for (int i = 0; i < Integer.parseInt(c+""); i++)
                Thread.sleep(1000);
        }
    
    }

    先启动两个消费者,再启动生产者,查看控制台:

    消费者A

    消费者B

    生产者(这里不必有疑问,这里打印的是修改之前的消息)

     要说明的是什么观点呢?

    默认情况下,RabbitMQ将按顺序将每条消息发送给下一个使用者。一般来说,每个消费者得到的消息是一样多。但是,并不是说每个消费者的任务重量是平均的。很有可能出现A总在处理耗时任务,B一直吃西瓜的情况。

    因为兔子不知道每个消息的耗时,他就会傻傻的派遣任务。

     不过,官方也有解决办法。

    为了解决这个问题,我们可以使用basicQos方法,设置prefetchCount = 1这告诉RabbitMQ不要向消费者发送多于一条消息。换句话说,在它处理并确认了前一个消息之前,不要向工作人员发送新消息。

    如果当前消费者正在忙碌(没有确认消息),它会将其分派给空闲下一个消费者。

    int prefetchCount = 1;
    channel.basicQos(prefetchCount);
  • 相关阅读:
    heat模板
    Leetcode812.Largest Triangle Area最大三角形面积
    Leetcode812.Largest Triangle Area最大三角形面积
    Leetcode811.Subdomain Visit Count子域名访问计数
    Leetcode811.Subdomain Visit Count子域名访问计数
    Leetcode806.Number of Lines To Write String写字符串需要的行数
    Leetcode806.Number of Lines To Write String写字符串需要的行数
    Leetcode819.Most Common Word最常见的单词
    Leetcode819.Most Common Word最常见的单词
    Leetcode783.Minimum Distance Between BST Nodes二叉搜索树结点最小距离
  • 原文地址:https://www.cnblogs.com/LUA123/p/8472070.html
Copyright © 2011-2022 走看看