http://activemq.apache.org/consumer-priority.html consumer 优先级
http://activemq.apache.org/activemq-message-properties.html 消息优先级
1、设置 consumer 的优先级:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10"); consumer = session.createConsumer(queue);
priority 的取值从0到127。broker 按照 consumer 的优先级给 queue 的 consumers 排序,首先把消息分发给优先级最高的 consumer。一旦该 consumer 的 prefetch buffer 满了,broker 就把消息分发给优先级次高的,prefetch buffer 不满的 consumer。
// org.apache.activemq.broker.region.Queue // consumer priority 的比较器 private final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() { @Override public int compare(Subscription s1, Subscription s2) { // We want the list sorted in descending order // 倒序,即数值大的优先级高 int val = s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority(); if (val == 0 && messageGroupOwners != null) { // then ascending order of assigned message groups to favour less loaded consumers // Long.compare in jdk7 long x = s1.getConsumerInfo().getLastDeliveredSequenceId(); long y = s2.getConsumerInfo().getLastDeliveredSequenceId(); val = (x < y) ? -1 : ((x == y) ? 0 : 1); } return val; } }; // 添加 consumer 的时候,会触发排序 // 在 consumers 列表中,靠前的 consumer,先分发消息 private void addToConsumerList(Subscription sub) { if (useConsumerPriority) { consumers.add(sub); Collections.sort(consumers, orderedCompare); } else { consumers.add(sub); } }
2、设置 message 的优先级需要在 broker 端和 producer 端配置:
2.1 在 broker 端设置 TEST.BAT 队列为 prioritizedMessages = "true"
<policyEntry queue="TEST.BAT" prioritizedMessages="true" producerFlowControl="true" memoryLimit="1mb"> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="TEST"/> </deadLetterStrategy> <pendingQueuePolicy> <storeCursor/> </pendingQueuePolicy> </policyEntry>
2.2 producer 发送消息时,设置 message 的优先级
TextMessage message = session.createTextMessage(text);
producer.send(destination, message, DeliveryMode.NON_PERSISTENT, 1, 0);
设置 message 的优先级,需要调用:
void javax.jms.MessageProducer.send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
throws JMSException
而不能这么写:
TextMessage message = session.createTextMessage(text);
message.setJMSPriority(0);
初步看是 ActiveMQ 的 bug。消息的 priority 值,从0到9。消息配置了优先级之后,消息存放在 PrioritizedPendingList 中。
// 省略部分代码 private class PrioritizedPendingListIterator implements Iterator<MessageReference> { private int index = 0; private int currentIndex = 0; List<PendingNode> list = new ArrayList<PendingNode>(size()); PrioritizedPendingListIterator() { for (int i = MAX_PRIORITY - 1; i >= 0; i--) { // priority 值大的优先级高 OrderedPendingList orderedPendingList = lists[i]; if (!orderedPendingList.isEmpty()) { list.addAll(orderedPendingList.getAsList()); } } } }