参考资料:https://www.rabbitmq.com/maxlength.html
RabbitMQ 有两种方式限制队列长度,第一种是对队列中消息总数进行限制:
gordon.study.rabbitmq.features.TestQueueLengthLimit.java
public class TestQueueLengthLimit {
private static final String QUEUE_NAME = "queueLengthLimit";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel senderChannel = connection.createChannel();
Channel consumerChannel = connection.createChannel();
// 设置队列最大消息数量为5
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-length", 5);
senderChannel.queueDeclare(QUEUE_NAME, false, false, true, args);
// 发布6个消息
for (int i = 0; i < 6;) {
String message = "NO. " + ++i;
senderChannel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
}
// 获取的消息为 NO. 2,说明队列头部第一条消息被抛弃
Thread.sleep(100);
GetResponse resp = consumerChannel.basicGet(QUEUE_NAME, false);
String message = new String(resp.getBody(), "UTF-8");
System.out.printf("consume: %s
", message);
System.out.printf("queue size: %d
", resp.getMessageCount());
// 现在队列中有4个 Ready消息,1个 Unacked消息。此时再发布两条消息,应该只有 NO. 3 被抛弃。
senderChannel.basicPublish("", QUEUE_NAME, null, "NO. 7".getBytes("UTF-8"));
senderChannel.basicPublish("", QUEUE_NAME, null, "NO. 8".getBytes("UTF-8"));
Thread.sleep(100);
GetResponse resp2 = consumerChannel.basicGet(QUEUE_NAME, false);
message = new String(resp2.getBody(), "UTF-8");
System.out.printf("consume: %s
", message);
// 现在队列中有4个 Ready消息,2个 Unacked消息。
// 此时Nack,消息2、4取消退回队列头导致队列消息数量超过设定值,谁能留下?
consumerChannel.basicNack(resp2.getEnvelope().getDeliveryTag(), true, true);
Thread.sleep(100);
while (true) {
resp = consumerChannel.basicGet(QUEUE_NAME, true);
if (resp == null) {
break;
} else {
message = new String(resp.getBody(), "UTF-8");
System.out.printf("consume: %s
", message);
}
}
}
}
顾名思义,x-max-length 参数限制了一个队列的消息总数,当消息总数达到限定值时,队列头的消息会被抛弃。
此外,处于 Unacked 状态的消息不纳入消息总数计算。但是,当 Unacked 消息被 reject 并重新入队时,就会受 x-max-length 参数限制,可能回不了队列。
第二种是对队列中消息体总字节数进行限制:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-length-bytes ", 1000);
senderChannel.queueDeclare(QUEUE_NAME, false, false, true, args);
只计算消息体的字节数,不算消息头、消息属性等字节数。