zoukankan      html  css  js  c++  java
  • RabbitMQ入门_10_队列长度限制

    参考资料:https://www.rabbitmq.com/maxlength.html

    RabbitMQ 有两种方式限制队列长度,第一种是对队列中消息总数进行限制:

    gordon.study.rabbitmq.features.TestQueueLengthLimit.java

    public class TestQueueLengthLimit {
     
        private static final String QUEUE_NAME = "queueLengthLimit";
     
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel senderChannel = connection.createChannel();
            Channel consumerChannel = connection.createChannel();
     
            // 设置队列最大消息数量为5
            Map<String, Object> args = new HashMap<String, Object>();
            args.put("x-max-length", 5);
            senderChannel.queueDeclare(QUEUE_NAME, false, false, true, args);
            // 发布6个消息
            for (int i = 0; i < 6;) {
                String message = "NO. " + ++i;
                senderChannel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            }
     
            // 获取的消息为 NO. 2,说明队列头部第一条消息被抛弃
            Thread.sleep(100);
            GetResponse resp = consumerChannel.basicGet(QUEUE_NAME, false);
            String message = new String(resp.getBody(), "UTF-8");
            System.out.printf("consume: %s
    ", message);
            System.out.printf("queue size: %d
    ", resp.getMessageCount());
     
            // 现在队列中有4个 Ready消息,1个 Unacked消息。此时再发布两条消息,应该只有 NO. 3 被抛弃。
            senderChannel.basicPublish("", QUEUE_NAME, null, "NO. 7".getBytes("UTF-8"));
            senderChannel.basicPublish("", QUEUE_NAME, null, "NO. 8".getBytes("UTF-8"));
            Thread.sleep(100);
            GetResponse resp2 = consumerChannel.basicGet(QUEUE_NAME, false);
            message = new String(resp2.getBody(), "UTF-8");
            System.out.printf("consume: %s
    
    ", message);
     
            // 现在队列中有4个 Ready消息,2个 Unacked消息。
            // 此时Nack,消息2、4取消退回队列头导致队列消息数量超过设定值,谁能留下?
            consumerChannel.basicNack(resp2.getEnvelope().getDeliveryTag(), true, true);
            Thread.sleep(100);
            while (true) {
                resp = consumerChannel.basicGet(QUEUE_NAME, true);
                if (resp == null) {
                    break;
                } else {
                    message = new String(resp.getBody(), "UTF-8");
                    System.out.printf("consume: %s
    ", message);
                }
            }
        }
    }
    

    顾名思义,x-max-length 参数限制了一个队列的消息总数,当消息总数达到限定值时,队列头的消息会被抛弃

    此外,处于 Unacked 状态的消息不纳入消息总数计算。但是,当 Unacked 消息被 reject 并重新入队时,就会受 x-max-length 参数限制,可能回不了队列。


    第二种是对队列中消息体总字节数进行限制:

            Map<String, Object> args = new HashMap<String, Object>();
            args.put("x-max-length-bytes ", 1000);
            senderChannel.queueDeclare(QUEUE_NAME, false, false, true, args);
    

    只计算消息体的字节数,不算消息头、消息属性等字节数。

  • 相关阅读:
    getopt( )和 getopt_long( )
    关于跳跃表 转
    进程控制块的存放和当前进程的确定
    BUAA_OO_2020_Unit1 Summary
    熟悉常用的Linux操作
    编译原理
    词法分析
    组合数据类型练习
    实验一 词法分析实验
    简化版C语言文法
  • 原文地址:https://www.cnblogs.com/gordonkong/p/6978567.html
Copyright © 2011-2022 走看看