zoukankan      html  css  js  c++  java
  • RabbitMQ入门-竞争消费者模式

    上一篇讲了个 哈喽World,现在来看看如果存在多个消费者的情况。

    生产者:

    package com.example.demo;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 竞争消费者模式
     */
    public class CompetingSend {
    
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();    // 连接工厂
            factory.setHost("localhost");
            Connection connection = factory.newConnection();        // 获取连接
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);    // 声明队列,只有他不存在的时候创建
            String msg = "Hello World!";
            // 发送多条消息
            for (int i = 0; i < 5; i++){
                channel.basicPublish("", QUEUE_NAME, null, (msg + "-" + i).getBytes());
                System.out.println("Sending:" + msg);
            }
    
            channel.close();
            connection.close();
    
        }
    }

    消费者:

    package com.example.demo;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 一个生产者,多个消费者
     */
    public class CompetingReceiveA {
    
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();    // 连接工厂
            factory.setHost("localhost");
            Connection connection = factory.newConnection();        // 获取连接
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);    // 声明队列,只有他不存在的时候创建
    
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String recv = new String(body, "UTF-8");
                    System.out.println("Receive:" + recv);
                    try {
                        doWork(recv);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("Done");
                    }
                }
            };
    
            // true代表接收到消息后,给兔子发消息,让这条消息失效
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    
        // 模拟每条消息处理时间不一样
        private static void doWork(String msg) throws InterruptedException {
            char c = msg.charAt(msg.length() - 1);
            for (int i = 0; i < Integer.parseInt(c+""); i++)
                Thread.sleep(1000);
        }
    
    }

    先启动两个消费者,再启动生产者,查看控制台:

    消费者A

    消费者B

    生产者(这里不必有疑问,这里打印的是修改之前的消息)

     要说明的是什么观点呢?

    默认情况下,RabbitMQ将按顺序将每条消息发送给下一个使用者。一般来说,每个消费者得到的消息是一样多。但是,并不是说每个消费者的任务重量是平均的。很有可能出现A总在处理耗时任务,B一直吃西瓜的情况。

    因为兔子不知道每个消息的耗时,他就会傻傻的派遣任务。

     不过,官方也有解决办法。

    为了解决这个问题,我们可以使用basicQos方法,设置prefetchCount = 1这告诉RabbitMQ不要向消费者发送多于一条消息。换句话说,在它处理并确认了前一个消息之前,不要向工作人员发送新消息。

    如果当前消费者正在忙碌(没有确认消息),它会将其分派给空闲下一个消费者。

    int prefetchCount = 1;
    channel.basicQos(prefetchCount);
  • 相关阅读:
    【原创】颜色替换的递归算法
    【原创】Hacker学习发展流程图 V1.0
    【转载】基数排序
    【翻译】利用加速度求解位置的算法——三轴传感器
    js高级程序设计——笔记
    java中的多线程——进度1
    数据结构和算法——进度1
    java String字符串——进度1
    java中运算符——进度1
    jquery的插件机制
  • 原文地址:https://www.cnblogs.com/LUA123/p/8472070.html
Copyright © 2011-2022 走看看